diff --git a/back_ground_module/logs/error_task.log b/back_ground_module/logs/error_task.log index 42d27d1..e69e1fd 100644 --- a/back_ground_module/logs/error_task.log +++ b/back_ground_module/logs/error_task.log @@ -14,3 +14,5 @@ 2025-09-15 14:14:19,894 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:'NoneType' object has no attribute 'iterrows' 2025-09-15 14:17:26,819 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:获取接车宝数据失败,返回None 2025-09-26 11:03:13,587 - update_all_NGV_data_daily.py - error_task_logger - ERROR - NGV更新数据执行时发生异常: cannot reindex on an axis with duplicate labels +2025-10-27 11:09:20,885 - update_NGV_data.py - error_task_logger - ERROR - NGV过滤后数据保存异常: [Errno 22] Invalid argument: 'output\\2025-10-27 11:08:51NGV.csv' +2025-10-27 14:50:32,084 - non_standar_performance_to_BI.py - error_task_logger - ERROR - 非标业绩提报转BI发生错误'报备业绩归属人' diff --git a/back_ground_module/non_standar_performance_to_BI.py b/back_ground_module/non_standar_performance_to_BI.py index 17622b2..686b91b 100644 --- a/back_ground_module/non_standar_performance_to_BI.py +++ b/back_ground_module/non_standar_performance_to_BI.py @@ -41,7 +41,6 @@ class NonStandardPerformanceToBI: "开户/处理日期": "_widget_1753770875894", "小六业绩金额": "_widget_1753770875898", "区域业绩金额": "_widget_1753770875937", - "报备业绩归属人": "_widget_1753770875901", "报备业绩归属区域经理": "_widget_1753770875903", "报备业绩归属大区": "_widget_1753866196486", "原业绩归属人": "_widget_1753856032683", @@ -56,10 +55,15 @@ class NonStandardPerformanceToBI: "新签提成比例-首年": "_widget_1753778922503", "新签提成比例-非首年": "_widget_1753778922548", "新签阶段及提成比例": "_widget_1753778656359", - "业绩动作":"_widget_1756708722933","提成动作":"_widget_1756708722932", + "业绩动作":"_widget_1756708722933", + "提成动作":"_widget_1756708722932", "新签阶段及提成比例.选择提成阶段": "_widget_1753778656359._widget_1753778656361", "新签阶段及提成比例.新签阶段": "_widget_1753778656359._widget_1753948745962", "新签阶段及提成比例.提成比例": "_widget_1753778656359._widget_1753778656362", + "业绩类型":"_widget_1753770875966", + "报备业绩归属小六":"_widget_1753770875901", + "原业绩归属大区":"_widget_1755159216098", + "业绩分类":"_widget_1758706882564", "提交人": "creator", "提交时间": "createTime", "更新时间": "updateTime" @@ -119,7 +123,7 @@ class NonStandardPerformanceToBI: df.columns = [reverse_mapping.get(col, col) for col in df.columns] # 2.成员字段取值 - user_columns = ["报备业绩归属人", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家"] + user_columns = ["报备业绩归属小六", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家"] for col in user_columns: df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") diff --git a/back_ground_module/update_NGV_data.py b/back_ground_module/update_NGV_data.py index 607eeff..cb621e6 100644 --- a/back_ground_module/update_NGV_data.py +++ b/back_ground_module/update_NGV_data.py @@ -105,10 +105,12 @@ class UpdateNGVData: # all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j1.iterrows()] # 前两天的全部数据 # all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j.iterrows()] # 前一天的全部数据 all_data = [self.row_to_dict(row, self.field_mapping) for index, row in filtered_df.iterrows()] # 增量数据 + try: - filtered_df.to_csv(output_dir + "\\" + f"{task_start_time}NGV.csv") + filtered_df.to_csv(os.path.join(output_dir, f"{task_start_time}NGV.csv")) except Exception as e: error_task_logger.error(f"NGV过滤后数据保存异常: {e}") + pass # data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data} diff --git a/back_ground_module/update_all_NGV_data_daily.py b/back_ground_module/update_all_NGV_data_daily.py index 869c893..efca0d6 100644 --- a/back_ground_module/update_all_NGV_data_daily.py +++ b/back_ground_module/update_all_NGV_data_daily.py @@ -112,7 +112,7 @@ CONCURRENT_WORKERS = 8 USE_BATCH_CREATE = True # True=批量创建(快),False=逐条创建(慢) # 批量创建大小(每批次创建的记录数) -BATCH_CREATE_SIZE = 100 +BATCH_CREATE_SIZE = 90 # ==================================================== @@ -607,7 +607,8 @@ class UpdateAllNGVDataDaily: update_data_list.append({ 'org_code': org_code, 'data_id': str(data_id), - 'data_dict': data_dict.copy() + 'data_dict': data_dict.copy(), + 'row_data': row # 保存原始数据用于输出 }) else: # 创建操作:必须包含门店编码字段 @@ -617,7 +618,8 @@ class UpdateAllNGVDataDaily: create_data_list.append({ 'org_code': idx, - 'data_dict': data_dict + 'data_dict': data_dict, + 'row_data': row # 保存原始数据用于输出 }) logger.info(f" - 需要更新: {len(update_data_list)} 条") @@ -630,6 +632,8 @@ class UpdateAllNGVDataDaily: update_count = self._concurrent_update(update_data_list) else: update_count = self._single_update(update_data_list) + # 输出更新统计 + self._save_update_stats(update_data_list) # 执行创建 create_count = 0 @@ -638,6 +642,8 @@ class UpdateAllNGVDataDaily: create_count = self._batch_create(create_data_list) else: create_count = self._single_create(create_data_list) + # 输出新增数据 + self._save_create_data(create_data_list) logger.info(f" ✓ 同步完成: 更新 {update_count} 条, 创建 {create_count} 条") @@ -1052,6 +1058,118 @@ class UpdateAllNGVDataDaily: logger.error(f" ✗ 加载缓存失败 ({cache_key}): {e}") return pd.DataFrame() + def _save_create_data(self, create_data_list): + """ + 保存新增数据到文件 + + 参数: + create_data_list: 新增数据列表 + """ + if len(create_data_list) == 0: + return + + try: + # 生成时间戳 + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + + # 提取数据到DataFrame + create_records = [] + for item in create_data_list: + row_data = item['row_data'] + record = { + 'org_code': item['org_code'], + 'org_name': row_data.get('org_name', ''), + 'group_name': row_data.get('group_name', ''), + 'org_type': row_data.get('org_type', ''), + 'province_name': row_data.get('province_name', ''), + 'city_name': row_data.get('city_name', ''), + 'saas_version': row_data.get('saas_version', ''), + 'active_status_fmt': row_data.get('active_status_fmt', ''), + } + create_records.append(record) + + create_df = pd.DataFrame(create_records) + + # 使用相对路径保存(支持跨平台) + file_path = os.path.join(output_dir, f'新增门店_{timestamp}.csv') + create_df.to_csv(file_path, index=False, encoding='utf-8-sig') + + logger.info(f" ✓ 新增数据已保存: {file_path} ({len(create_df)} 条)") + except Exception as e: + error_task_logger.error(f"保存新增数据失败: {e}", exc_info=True) + + def _save_update_stats(self, update_data_list): + """ + 保存更新统计信息到文件 + + 参数: + update_data_list: 更新数据列表 + """ + if len(update_data_list) == 0: + return + + try: + # 生成时间戳 + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + + # 统计每个org_code的更新记录数(去重) + org_code_counts = {} + org_code_info = {} + + for item in update_data_list: + org_code = item['org_code'] + if org_code not in org_code_counts: + org_code_counts[org_code] = 0 + row_data = item['row_data'] + org_code_info[org_code] = { + 'org_name': row_data.get('org_name', ''), + 'group_name': row_data.get('group_name', ''), + 'org_type': row_data.get('org_type', ''), + 'province_name': row_data.get('province_name', ''), + 'city_name': row_data.get('city_name', ''), + 'saas_version': row_data.get('saas_version', ''), + 'active_status_fmt': row_data.get('active_status_fmt', ''), + } + org_code_counts[org_code] += 1 + + # 构建统计DataFrame + update_stats = [] + for org_code, count in org_code_counts.items(): + info = org_code_info[org_code] + stat = { + 'org_code': org_code, + 'org_name': info['org_name'], + 'group_name': info['group_name'], + 'org_type': info['org_type'], + 'province_name': info['province_name'], + 'city_name': info['city_name'], + 'saas_version': info['saas_version'], + 'active_status_fmt': info['active_status_fmt'], + 'update_count': count, + 'note': '同一org_code有多个记录' if count > 1 else '' + } + update_stats.append(stat) + + update_df = pd.DataFrame(update_stats) + update_df = update_df.sort_values('update_count', ascending=False) + + # 使用相对路径保存(支持跨平台) + file_path = os.path.join(output_dir, f'更新统计_{timestamp}.csv') + update_df.to_csv(file_path, index=False, encoding='utf-8-sig') + + # 统计汇总 + total_org_codes = len(org_code_counts) + total_records = len(update_data_list) + duplicate_org_codes = sum(1 for count in org_code_counts.values() if count > 1) + + logger.info(f" ✓ 更新统计已保存: {file_path}") + logger.info(f" - 更新的org_code数: {total_org_codes}") + logger.info(f" - 更新的记录总数: {total_records}") + if duplicate_org_codes > 0: + logger.info(f" - 包含多条记录的org_code: {duplicate_org_codes}") + except Exception as e: + error_task_logger.error(f"保存更新统计失败: {e}", exc_info=True) + if __name__ == '__main__': updater = UpdateAllNGVDataDaily() diff --git a/test/BI.ipynb b/test/BI.ipynb index f8b303d..0b60fad 100644 --- a/test/BI.ipynb +++ b/test/BI.ipynb @@ -257,8 +257,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-09-24T05:56:32.216878Z", - "start_time": "2025-09-24T05:56:31.974390Z" + "end_time": "2025-10-27T06:50:20.870562Z", + "start_time": "2025-10-27T06:50:20.631342Z" } }, "cell_type": "code", @@ -267,17 +267,17 @@ "from mysql.connector import Error\n", "\n", "HS_DB_Config = {\n", - " 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n", - " 'user': \"rw_operation_data_relay\",\n", - " 'password': \"m+q5Z4%IVuF9bf\",\n", - " 'database': \"f6operation_data_relay\"\n", - "} # 衡时数据库链接配置-mysql\n", + " 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n", + " 'user': \"rw_operation_data_relay\",\n", + " 'password': \"m+q5Z4%IVuF9bf\",\n", + " 'database': \"f6operation_data_relay\"\n", + " } # 衡时数据库链接配置-mysql\n", "# table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n", "\n", - "table_name = \"partner_settlement_to_BI\"\n", - "column_name = \"提交时间\"\n", - "# new_column_type = \"VARCHAR(255)\" # 目标数据类型\n", - "new_column_type = \"DATETIME\" # 目标数据类型\n", + "table_name = \"non_standard_performance_to_BI\"\n", + "column_name = \"业绩分类\"\n", + "new_column_type = \"VARCHAR(255)\" # 目标数据类型\n", + "# new_column_type = \"DATETIME\" # 目标数据类型\n", "\n", "try:\n", " # 连接数据库\n", @@ -336,12 +336,12 @@ "name": "stdout", "output_type": "stream", "text": [ - "✅ 成功添加字段: `提交时间`\n", + "✅ 成功添加字段: `业绩分类`\n", "数据库连接已关闭\n" ] } ], - "execution_count": 4 + "execution_count": 5 }, { "metadata": {},