diff --git a/back_ground_module/update_all_NGV_data_daily.py b/back_ground_module/update_all_NGV_data_daily.py index 10136cc..081cf83 100644 --- a/back_ground_module/update_all_NGV_data_daily.py +++ b/back_ground_module/update_all_NGV_data_daily.py @@ -154,7 +154,7 @@ class UpdateAllNGVDataDaily: jdy_ngv_data, staff_id_map = self._load_base_data() # 步骤2: 获取并处理NGV源数据 - ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data() + ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data(task_start_time) # 步骤3: 处理已删除的门店 self._handle_deleted_stores(jdy_ngv_data, ngv_data_today) @@ -239,7 +239,7 @@ class UpdateAllNGVDataDaily: return jdy_ngv_data, staff_id_map - def _load_ngv_source_data(self): + def _load_ngv_source_data(self, task_start_time): """ 步骤2: 获取并处理NGV源数据 返回: (昨天的数据, 前天的数据) @@ -263,6 +263,10 @@ class UpdateAllNGVDataDaily: ngv_data_1 = common_module.get_ngv_details(days_back=1) ngv_data_2 = common_module.get_ngv_details(days_back=2) + # 存储每天获取到的数据 + ngv_data_1.to_csv(f"{task_start_time}_ngv_data_today.csv", index=False) + ngv_data_2.to_csv(f"{task_start_time}_ngv_data_yesterday.csv", index=False) + # 只保留 org_type 为 "一般" 的记录 ngv_data_1 = ngv_data_1[ngv_data_1['org_type'] == '一般'] ngv_data_2 = ngv_data_2[ngv_data_2['org_type'] == '一般'] @@ -795,7 +799,7 @@ class UpdateAllNGVDataDaily: 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, 'data': item['data_dict'], - 'is_start_trigger':'true', + 'is_start_trigger': 'true', } api_instance.data_batch_create(data=create_data, max_retries=20) @@ -1085,7 +1089,7 @@ class UpdateAllNGVDataDaily: try: # 生成时间戳 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - + # 提取数据到DataFrame create_records = [] for item in create_data_list: @@ -1101,13 +1105,13 @@ class UpdateAllNGVDataDaily: 'active_status_fmt': row_data.get('active_status_fmt', ''), } create_records.append(record) - + create_df = pd.DataFrame(create_records) - + # 使用相对路径保存(支持跨平台) file_path = os.path.join(output_dir, f'新增门店_{timestamp}.csv') create_df.to_csv(file_path, index=False, encoding='utf-8-sig') - + logger.info(f" ✓ 新增数据已保存: {file_path} ({len(create_df)} 条)") except Exception as e: error_task_logger.error(f"保存新增数据失败: {e}", exc_info=True) @@ -1125,11 +1129,11 @@ class UpdateAllNGVDataDaily: try: # 生成时间戳 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - + # 统计每个org_code的更新记录数(去重) org_code_counts = {} org_code_info = {} - + for item in update_data_list: org_code = item['org_code'] if org_code not in org_code_counts: @@ -1145,7 +1149,7 @@ class UpdateAllNGVDataDaily: 'active_status_fmt': row_data.get('active_status_fmt', ''), } org_code_counts[org_code] += 1 - + # 构建统计DataFrame update_stats = [] for org_code, count in org_code_counts.items(): @@ -1163,19 +1167,19 @@ class UpdateAllNGVDataDaily: 'note': '同一org_code有多个记录' if count > 1 else '' } update_stats.append(stat) - + update_df = pd.DataFrame(update_stats) update_df = update_df.sort_values('update_count', ascending=False) - + # 使用相对路径保存(支持跨平台) file_path = os.path.join(output_dir, f'更新统计_{timestamp}.csv') update_df.to_csv(file_path, index=False, encoding='utf-8-sig') - + # 统计汇总 total_org_codes = len(org_code_counts) total_records = len(update_data_list) duplicate_org_codes = sum(1 for count in org_code_counts.values() if count > 1) - + logger.info(f" ✓ 更新统计已保存: {file_path}") logger.info(f" - 更新的org_code数: {total_org_codes}") logger.info(f" - 更新的记录总数: {total_records}") @@ -1188,4 +1192,3 @@ class UpdateAllNGVDataDaily: if __name__ == '__main__': updater = UpdateAllNGVDataDaily() updater.main() -