diff --git a/.idea/csv-editor.xml b/.idea/csv-editor.xml index c9450ab..20ce6ba 100644 --- a/.idea/csv-editor.xml +++ b/.idea/csv-editor.xml @@ -17,27 +17,6 @@ - - - - - - - - - - - - - - - - - - diff --git a/back_ground_module/__init__.py b/back_ground_module/__init__.py index 60d7907..7e80102 100644 --- a/back_ground_module/__init__.py +++ b/back_ground_module/__init__.py @@ -27,3 +27,4 @@ from back_ground_module.new_dealer_service_order_to_bi import NewDealerServiceOr from back_ground_module.non_standar_performance_to_BI import NonStandardPerformanceToBI from back_ground_module.partner_settlement_to_BI import PartnerSettlementToBI from back_ground_module.GD_match_phone_number import GDMatchPhoneNumber +from back_ground_module.province_city_person_relation_to_bi import ProvinceCityPersonRelationToBI diff --git a/back_ground_module/province_city_person_relation_to_bi.py b/back_ground_module/province_city_person_relation_to_bi.py new file mode 100644 index 0000000..512c95a --- /dev/null +++ b/back_ground_module/province_city_person_relation_to_bi.py @@ -0,0 +1,198 @@ +import pandas as pd +import datetime +from config import Config +from api import API +import pymysql # 使用 pymysql 替代 mysql.connector +from back_ground_module import CommonModule +import os +import mysql.connector +import pandas as pd +import json +import numpy as np +import mysql.connector +from mysql.connector import Error +from log_config import configure_task_logger, configure_error_task_logger +import math + +logger = configure_task_logger() +error_task_logger = configure_error_task_logger() +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) +common_module = CommonModule() +api_instance = API() + + +class ProvinceCityPersonRelationToBI: + def __init__(self): + self.pvc_data = None + self.field_mapping = { + "省": "_widget_1734677164861", + "市": "_widget_1734677164862", + "区": "_widget_1734677164863", + "运营顾问": "_widget_1734677164864", + "区域经理": "_widget_1734677164865", + "运营专家": "_widget_1734677164866", + "战区": "_widget_1734677164867", + "新签回访客服": "_widget_1734677164868", + "续约回访客服": "_widget_1734677164869", + "异常待办客服": "_widget_1734677164870", + "日常回访客服": "_widget_1734677164871", + } + + def load_all_data(self): + payload = {"api_key": "675b900991ad2491c69389ca", + "entry_id": "676512ac3e54dc3159460c0a", + } + pvc_data = api_instance.entry_data_list(payload) + self.pvc_data = pvc_data.get("data") # api请求格式,将数据封装在data字典里 + + def data_process(self): + df = pd.DataFrame(self.pvc_data) + # 反转映射字典 + reverse_mapping = {v: k for k, v in self.field_mapping.items()} + # 1.列明替换 + df.columns = [reverse_mapping.get(col, col) for col in df.columns] + + # 2.成员字段取值 + user_columns = ["运营顾问", "区域经理", "运营专家", "新签回访客服", "续约回访客服", + "异常待办客服", "日常回访客服"] + + for col in user_columns: + df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") + + return df + + def clear_table_data(self): + """ + 清空指定 MySQL 表的数据。 + 参数已写死在函数内部,直接调用即可。 + """ + # 数据库连接信息 + HS_DB_Config = { + 'host': "f6-public.rwlb.rds.aliyuncs.com", + 'user': "rw_operation_data_relay", + 'password': "m+q5Z4%IVuF9bf", + 'database': "f6operation_data_relay" + } + table_name = "province_city_person_relation_to_bi" # 要清空的表名 + + connection = None + try: + # 建立数据库连接 + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + if connection.is_connected(): + cursor = connection.cursor() + + # 使用TRUNCATE清空表数据 + cursor.execute(f"TRUNCATE TABLE {table_name}") + connection.commit() + + logger.info(f"成功清空表 {table_name} 中的所有数据") + + except Error as e: + error_task_logger.error(f"清空表时发生错误: {e}") + if connection and connection.is_connected(): + connection.rollback() + finally: + if connection and connection.is_connected(): + cursor.close() + connection.close() + logger.info("数据库连接已关闭") + + def write_to_bi(self, df): + HS_DB_Config = Config.HS_DB_Config + table_name = "province_city_person_relation_to_bi" + chunk_size = 1000 # 每批插入 1000 行 + + # 清理 DataFrame 中的 NaN/None 等值 + df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None) + + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + cursor = connection.cursor() + + try: + # 获取数据库表的列名 + cursor.execute(f"SHOW COLUMNS FROM `{table_name}`") + db_columns = [col[0] for col in cursor.fetchall()] + + # 保留与数据库匹配的列 + filtered_df = df[df.columns.intersection(db_columns)] + if filtered_df.empty: + print("DataFrame 中没有与数据库表结构匹配的列。") + return + + # 处理 dict/list 类型字段:转为 JSON 字符串 + filtered_df = filtered_df.copy() + for col in filtered_df.columns: + if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any(): + filtered_df[col] = filtered_df[col].apply( + lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x + ) + + # 构建 INSERT 语句(只构建一次) + columns = [f"`{col}`" for col in filtered_df.columns] + placeholders = ', '.join(['%s'] * len(columns)) + insert_sql = f"INSERT INTO `{table_name}` ({', '.join(columns)}) VALUES ({placeholders})" + + total_rows = len(filtered_df) + num_chunks = math.ceil(total_rows / chunk_size) + + for i in range(num_chunks): + start_idx = i * chunk_size + end_idx = min(start_idx + chunk_size, total_rows) + chunk_df = filtered_df.iloc[start_idx:end_idx] + + # 转为元组列表 + data_to_insert = [ + tuple(row) for row in chunk_df.values + ] + + # 批量执行(executemany 更高效) + cursor.executemany(insert_sql, data_to_insert) + + connection.commit() + logger.info(f"成功写入 {total_rows} 条记录到 {table_name} 表中(分 {num_chunks} 批)。") + + except Exception as e: + error_task_logger.error(f"写入数据库时发生错误: {e}", exc_info=True) + connection.rollback() + finally: + cursor.close() + connection.close() + + def main(self): + task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + logger.info("任务开始") + # step1: 获取数据 + self.load_all_data() + logger.info("加载数据完成") + # step2:数据处理 + df = self.data_process() + # df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv")) + logger.info("数据处理完成") + # step3:数据库删除 + self.clear_table_data() + logger.info("目标数据库已清空") + # step4:数据写入BI + self.write_to_bi(df) + logger.info("数据已写入数据库中") + common_module.send_task_status(task_start_time, "省市区人员关系表转BI") + except Exception as e: + error_task_logger.error(f"省市区人员关系表转BI发生错误{e}") + common_module.send_task_error(task_start_time, "省市区人员关系表转BI", str(e)) + + +if __name__ == '__main__': + province_city_person_relation_to_bi = ProvinceCityPersonRelationToBI() + province_city_person_relation_to_bi.main() diff --git a/logs/error_task.log b/logs/error_task.log index 1026234..7de3079 100644 --- a/logs/error_task.log +++ b/logs/error_task.log @@ -2495,3 +2495,23 @@ 2025-11-21 10:16:19,115 - utils.py - error_task_logger - ERROR - 任务 经销商新签服务单转BI 超过执行窗口5分钟以上,标记为过期。 2025-11-21 10:16:19,117 - utils.py - error_task_logger - ERROR - 任务 非标业绩提报转BI 超过执行窗口5分钟以上,标记为过期。 2025-11-21 10:16:19,118 - utils.py - error_task_logger - ERROR - 任务 合伙人结算登记同步到BI 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,956 - log_config.py - error_task_logger - ERROR - 任务 NGV新增数据 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,957 - log_config.py - error_task_logger - ERROR - 任务 NGV更新数据 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,957 - log_config.py - error_task_logger - ERROR - 任务 新签客户回访 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,958 - log_config.py - error_task_logger - ERROR - 任务 续约客户回访 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,959 - log_config.py - error_task_logger - ERROR - 任务 接车宝日常派发 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,960 - log_config.py - error_task_logger - ERROR - 任务 私域小程序数据支撑 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,960 - log_config.py - error_task_logger - ERROR - 任务 小六提成数据支撑 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,961 - log_config.py - error_task_logger - ERROR - 任务 异业合作数据支撑 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,962 - log_config.py - error_task_logger - ERROR - 任务 短信数据支撑 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,962 - log_config.py - error_task_logger - ERROR - 任务 海外邮件推送 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,963 - log_config.py - error_task_logger - ERROR - 任务 异常服务待办派发 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,964 - log_config.py - error_task_logger - ERROR - 任务 简道云海外项目CRM客户档案迁移BI 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,965 - log_config.py - error_task_logger - ERROR - 任务 安装服务历史派发 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,966 - log_config.py - error_task_logger - ERROR - 任务 分母报备调整 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,966 - log_config.py - error_task_logger - ERROR - 任务 分子报备调整 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,967 - log_config.py - error_task_logger - ERROR - 任务 履约表数据支撑 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,967 - log_config.py - error_task_logger - ERROR - 任务 字段监控 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,968 - log_config.py - error_task_logger - ERROR - 任务 经销商新签服务单转BI 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,969 - log_config.py - error_task_logger - ERROR - 任务 高德匹配手机号 超过执行窗口5分钟以上,标记为过期。 +2025-12-25 16:00:55,969 - log_config.py - error_task_logger - ERROR - 任务 省市区人员关系表转BI 超过执行窗口5分钟以上,标记为过期。 diff --git a/logs/task.log b/logs/task.log index 7715d12..f18b73b 100644 --- a/logs/task.log +++ b/logs/task.log @@ -93,3 +93,51 @@ 2025-11-21 10:16:19,119 - utils.py - task_logger - INFO - 任务 合伙人结算登记同步到BI 状态已更新为 过期。 2025-11-21 10:16:19,119 - utils.py - task_logger - INFO - 启动任务加载完成。 2025-11-21 10:16:19,120 - main.py - task_logger - INFO - 程序已启动... +2025-12-25 16:00:55,654 - utils.py - task_logger - INFO - 任务队列已从磁盘加载。 +2025-12-25 16:00:55,930 - api.py - task_logger - INFO - 获取了34条数据 +2025-12-25 16:00:55,945 - sample_cloud_modules.py - task_logger - INFO - 任务已从云端获取并保存到 tasks.csv 文件。 +2025-12-25 16:00:55,946 - main.py - task_logger - INFO - 任务列表已保存到 csv 文件中。 +2025-12-25 16:00:55,946 - utils.py - task_logger - INFO - 启动时加载并执行任务... +2025-12-25 16:00:55,956 - utils.py - task_logger - INFO - 任务已从磁盘加载到全局任务字典。 +2025-12-25 16:00:55,956 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,957 - utils.py - task_logger - INFO - 任务 NGV新增数据 状态已更新为 过期。 +2025-12-25 16:00:55,957 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,957 - utils.py - task_logger - INFO - 任务 NGV更新数据 状态已更新为 过期。 +2025-12-25 16:00:55,958 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,958 - utils.py - task_logger - INFO - 任务 新签客户回访 状态已更新为 过期。 +2025-12-25 16:00:55,959 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,959 - utils.py - task_logger - INFO - 任务 续约客户回访 状态已更新为 过期。 +2025-12-25 16:00:55,960 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,960 - utils.py - task_logger - INFO - 任务 接车宝日常派发 状态已更新为 过期。 +2025-12-25 16:00:55,960 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,960 - utils.py - task_logger - INFO - 任务 私域小程序数据支撑 状态已更新为 过期。 +2025-12-25 16:00:55,961 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,961 - utils.py - task_logger - INFO - 任务 小六提成数据支撑 状态已更新为 过期。 +2025-12-25 16:00:55,962 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,962 - utils.py - task_logger - INFO - 任务 异业合作数据支撑 状态已更新为 过期。 +2025-12-25 16:00:55,962 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,962 - utils.py - task_logger - INFO - 任务 短信数据支撑 状态已更新为 过期。 +2025-12-25 16:00:55,963 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,963 - utils.py - task_logger - INFO - 任务 海外邮件推送 状态已更新为 过期。 +2025-12-25 16:00:55,964 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,964 - utils.py - task_logger - INFO - 任务 异常服务待办派发 状态已更新为 过期。 +2025-12-25 16:00:55,965 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,965 - utils.py - task_logger - INFO - 任务 简道云海外项目CRM客户档案迁移BI 状态已更新为 过期。 +2025-12-25 16:00:55,966 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,966 - utils.py - task_logger - INFO - 任务 安装服务历史派发 状态已更新为 过期。 +2025-12-25 16:00:55,966 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,966 - utils.py - task_logger - INFO - 任务 分母报备调整 状态已更新为 过期。 +2025-12-25 16:00:55,967 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,967 - utils.py - task_logger - INFO - 任务 分子报备调整 状态已更新为 过期。 +2025-12-25 16:00:55,967 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,967 - utils.py - task_logger - INFO - 任务 履约表数据支撑 状态已更新为 过期。 +2025-12-25 16:00:55,968 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,968 - utils.py - task_logger - INFO - 任务 字段监控 状态已更新为 过期。 +2025-12-25 16:00:55,968 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,969 - utils.py - task_logger - INFO - 任务 经销商新签服务单转BI 状态已更新为 过期。 +2025-12-25 16:00:55,969 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,969 - utils.py - task_logger - INFO - 任务 高德匹配手机号 状态已更新为 过期。 +2025-12-25 16:00:55,970 - utils.py - task_logger - INFO - 所有任务状态已保存到磁盘。 +2025-12-25 16:00:55,970 - utils.py - task_logger - INFO - 任务 省市区人员关系表转BI 状态已更新为 过期。 +2025-12-25 16:00:55,970 - utils.py - task_logger - INFO - 启动任务加载完成。 +2025-12-25 16:00:55,970 - main.py - task_logger - INFO - 程序已启动... diff --git a/module.py b/module.py index 7210b8f..67a441f 100644 --- a/module.py +++ b/module.py @@ -352,6 +352,18 @@ class Module: print("data_Exception_Task", e) return False + @staticmethod + def province_city_person_relation_to_bi(): + print("GD_match_phone_number") + try: + province_city_person_relation_to_bi = back_ground_module.province_city_person_relation_to_bi() + thread = threading.Thread(target=province_city_person_relation_to_bi.main) + thread.start() + return "data_Exception_Task" + except Exception as e: + print("data_Exception_Task", e) + return False + @staticmethod def text3(): print("text3") diff --git a/task_executor.py b/task_executor.py index 91b0416..d273e1e 100644 --- a/task_executor.py +++ b/task_executor.py @@ -42,6 +42,7 @@ def execute_task(task_id) -> bool: "合伙人结算登记同步到BI": Module.partner_settlement_to_BI, "非标业绩提报转BI": Module.non_standar_performance_to_BI, "高德匹配手机号": Module.GD_match_phone_number, + "省市区人员关系表转BI": Module.province_city_person_relation_to_bi, # 添加更多任务函数映射... } diff --git a/tasks.csv b/tasks.csv index 0deddd8..5c0b23f 100644 --- a/tasks.csv +++ b/tasks.csv @@ -1,11 +1,11 @@ unique_id,exec_time,is_switch_on,status NGV新增数据,09:00,True,过期 -NGV更新数据,12:30,True,待执行 +NGV更新数据,12:30,True,过期 新签客户回访,09:05,True,过期 续约客户回访,09:08,True,过期 大客户回访,08:55,False,已禁用 简道云拉取数据,08:00,False,已禁用 -接车宝日常派发,09:00,True,过期 +接车宝日常派发,09:10,True,过期 接车宝异常派发,09:00,False,已禁用 私域小程序数据支撑,04:40,True,过期 小六提成数据支撑,04:40,True,过期 @@ -25,6 +25,7 @@ NGV更新数据,12:30,True,待执行 履约表数据支撑,09:10,True,过期 字段监控,06:25,True,过期 经销商新签服务单转BI,08:05,True,过期 -非标业绩提报转BI,08:25,True,过期 -合伙人结算登记同步到BI,08:24,True,过期 -高德匹配手机号,05:00,False,已禁用 +非标业绩提报转BI,17:01,True,待执行 +合伙人结算登记同步到BI,17:02,True,待执行 +高德匹配手机号,05:00,True,过期 +省市区人员关系表转BI,08:00,True,过期 diff --git a/test/省市区人员关系表同步到BI.py b/test/省市区人员关系表同步到BI.py new file mode 100644 index 0000000..512c95a --- /dev/null +++ b/test/省市区人员关系表同步到BI.py @@ -0,0 +1,198 @@ +import pandas as pd +import datetime +from config import Config +from api import API +import pymysql # 使用 pymysql 替代 mysql.connector +from back_ground_module import CommonModule +import os +import mysql.connector +import pandas as pd +import json +import numpy as np +import mysql.connector +from mysql.connector import Error +from log_config import configure_task_logger, configure_error_task_logger +import math + +logger = configure_task_logger() +error_task_logger = configure_error_task_logger() +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) +common_module = CommonModule() +api_instance = API() + + +class ProvinceCityPersonRelationToBI: + def __init__(self): + self.pvc_data = None + self.field_mapping = { + "省": "_widget_1734677164861", + "市": "_widget_1734677164862", + "区": "_widget_1734677164863", + "运营顾问": "_widget_1734677164864", + "区域经理": "_widget_1734677164865", + "运营专家": "_widget_1734677164866", + "战区": "_widget_1734677164867", + "新签回访客服": "_widget_1734677164868", + "续约回访客服": "_widget_1734677164869", + "异常待办客服": "_widget_1734677164870", + "日常回访客服": "_widget_1734677164871", + } + + def load_all_data(self): + payload = {"api_key": "675b900991ad2491c69389ca", + "entry_id": "676512ac3e54dc3159460c0a", + } + pvc_data = api_instance.entry_data_list(payload) + self.pvc_data = pvc_data.get("data") # api请求格式,将数据封装在data字典里 + + def data_process(self): + df = pd.DataFrame(self.pvc_data) + # 反转映射字典 + reverse_mapping = {v: k for k, v in self.field_mapping.items()} + # 1.列明替换 + df.columns = [reverse_mapping.get(col, col) for col in df.columns] + + # 2.成员字段取值 + user_columns = ["运营顾问", "区域经理", "运营专家", "新签回访客服", "续约回访客服", + "异常待办客服", "日常回访客服"] + + for col in user_columns: + df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") + + return df + + def clear_table_data(self): + """ + 清空指定 MySQL 表的数据。 + 参数已写死在函数内部,直接调用即可。 + """ + # 数据库连接信息 + HS_DB_Config = { + 'host': "f6-public.rwlb.rds.aliyuncs.com", + 'user': "rw_operation_data_relay", + 'password': "m+q5Z4%IVuF9bf", + 'database': "f6operation_data_relay" + } + table_name = "province_city_person_relation_to_bi" # 要清空的表名 + + connection = None + try: + # 建立数据库连接 + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + if connection.is_connected(): + cursor = connection.cursor() + + # 使用TRUNCATE清空表数据 + cursor.execute(f"TRUNCATE TABLE {table_name}") + connection.commit() + + logger.info(f"成功清空表 {table_name} 中的所有数据") + + except Error as e: + error_task_logger.error(f"清空表时发生错误: {e}") + if connection and connection.is_connected(): + connection.rollback() + finally: + if connection and connection.is_connected(): + cursor.close() + connection.close() + logger.info("数据库连接已关闭") + + def write_to_bi(self, df): + HS_DB_Config = Config.HS_DB_Config + table_name = "province_city_person_relation_to_bi" + chunk_size = 1000 # 每批插入 1000 行 + + # 清理 DataFrame 中的 NaN/None 等值 + df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None) + + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + cursor = connection.cursor() + + try: + # 获取数据库表的列名 + cursor.execute(f"SHOW COLUMNS FROM `{table_name}`") + db_columns = [col[0] for col in cursor.fetchall()] + + # 保留与数据库匹配的列 + filtered_df = df[df.columns.intersection(db_columns)] + if filtered_df.empty: + print("DataFrame 中没有与数据库表结构匹配的列。") + return + + # 处理 dict/list 类型字段:转为 JSON 字符串 + filtered_df = filtered_df.copy() + for col in filtered_df.columns: + if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any(): + filtered_df[col] = filtered_df[col].apply( + lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x + ) + + # 构建 INSERT 语句(只构建一次) + columns = [f"`{col}`" for col in filtered_df.columns] + placeholders = ', '.join(['%s'] * len(columns)) + insert_sql = f"INSERT INTO `{table_name}` ({', '.join(columns)}) VALUES ({placeholders})" + + total_rows = len(filtered_df) + num_chunks = math.ceil(total_rows / chunk_size) + + for i in range(num_chunks): + start_idx = i * chunk_size + end_idx = min(start_idx + chunk_size, total_rows) + chunk_df = filtered_df.iloc[start_idx:end_idx] + + # 转为元组列表 + data_to_insert = [ + tuple(row) for row in chunk_df.values + ] + + # 批量执行(executemany 更高效) + cursor.executemany(insert_sql, data_to_insert) + + connection.commit() + logger.info(f"成功写入 {total_rows} 条记录到 {table_name} 表中(分 {num_chunks} 批)。") + + except Exception as e: + error_task_logger.error(f"写入数据库时发生错误: {e}", exc_info=True) + connection.rollback() + finally: + cursor.close() + connection.close() + + def main(self): + task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + logger.info("任务开始") + # step1: 获取数据 + self.load_all_data() + logger.info("加载数据完成") + # step2:数据处理 + df = self.data_process() + # df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv")) + logger.info("数据处理完成") + # step3:数据库删除 + self.clear_table_data() + logger.info("目标数据库已清空") + # step4:数据写入BI + self.write_to_bi(df) + logger.info("数据已写入数据库中") + common_module.send_task_status(task_start_time, "省市区人员关系表转BI") + except Exception as e: + error_task_logger.error(f"省市区人员关系表转BI发生错误{e}") + common_module.send_task_error(task_start_time, "省市区人员关系表转BI", str(e)) + + +if __name__ == '__main__': + province_city_person_relation_to_bi = ProvinceCityPersonRelationToBI() + province_city_person_relation_to_bi.main() diff --git a/tools/BI.ipynb b/tools/BI.ipynb index dd36879..0dbd8cc 100644 --- a/tools/BI.ipynb +++ b/tools/BI.ipynb @@ -12,8 +12,8 @@ "metadata": { "collapsed": true, "ExecuteTime": { - "end_time": "2025-08-20T09:06:39.520648Z", - "start_time": "2025-08-20T09:06:39.167174Z" + "end_time": "2025-12-25T07:53:32.248169100Z", + "start_time": "2025-12-25T07:53:32.063693500Z" } }, "source": [ @@ -26,29 +26,17 @@ " \"\"\"创建数据表\"\"\"\n", " create_table_query = f\"\"\"\n", " CREATE TABLE IF NOT EXISTS {table_name} (\n", - " `选择合伙人` VARCHAR(255) COMMENT '选择合伙人',\n", - " `合伙人姓名` VARCHAR(255) COMMENT '合伙人姓名',\n", - " `手机号` VARCHAR(255) COMMENT '手机号',\n", - " `合伙人身份` VARCHAR(255) COMMENT '合伙人身份',\n", - " `合伙人所在省市` VARCHAR(255) COMMENT '合伙人所在省市',\n", - " `合伙人登记人` VARCHAR(255) COMMENT '合伙人登记人',\n", - " `战区经理` VARCHAR(255) COMMENT '战区经理',\n", - " `提交人` VARCHAR(255) COMMENT '提交人',\n", - " `合伙人分类` VARCHAR(255) COMMENT '合伙人分类',\n", + " `省` VARCHAR(255) COMMENT '省',\n", + " `市` VARCHAR(255) COMMENT '市',\n", + " `区` VARCHAR(255) COMMENT '区',\n", + " `运营顾问` VARCHAR(255) COMMENT '运营顾问',\n", + " `区域经理` VARCHAR(255) COMMENT '区域经理',\n", + " `运营专家` VARCHAR(255) COMMENT '运营专家',\n", " `战区` VARCHAR(255) COMMENT '战区',\n", - " `订单编号` VARCHAR(255) COMMENT '订单登记表.订单编号',\n", - " `销售阶段` VARCHAR(255) COMMENT '订单登记表.销售阶段',\n", - " `版本` VARCHAR(255) COMMENT '订单登记表.版本',\n", - " `年限` VARCHAR(255) COMMENT '订单登记表.年限',\n", - " `成交金额` VARCHAR(255) COMMENT '订单登记表.成交金额',\n", - " `佣金` VARCHAR(255) COMMENT '订单登记表.佣金',\n", - " `理论佣金` VARCHAR(255) COMMENT '订单登记表.理论佣金',\n", - " `佣金比例` VARCHAR(255) COMMENT '订单登记表.佣金比例',\n", - " `合计佣金` VARCHAR(255) COMMENT '合计佣金',\n", - " `理论合计佣金` VARCHAR(255) COMMENT '理论合计佣金',\n", - " `特殊情况备注` VARCHAR(255) COMMENT '特殊情况备注',\n", - " `合伙人介绍证明` VARCHAR(255) COMMENT '合伙人介绍证明(微信聊天截图等)',\n", - " `合伙人类型` VARCHAR(255) COMMENT '合伙人类型'\n", + " `新签回访客服` VARCHAR(255) COMMENT '新签回访客服',\n", + " `续约回访客服` VARCHAR(255) COMMENT '续约回访客服',\n", + " `异常待办客服` VARCHAR(255) COMMENT '异常待办客服',\n", + " `日常回访客服` VARCHAR(255) COMMENT '日常回访客服'\n", " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ;\n", " \"\"\"\n", " cursor.execute(create_table_query)\n", @@ -64,7 +52,7 @@ "} # 衡时数据库链接配置-mysql\n", "\n", "# 表名\n", - "table_name = \"partner_settlement_to_BI\" # 请替换为实际的表名\n", + "table_name = \"province_city_person_relation_to_bi\" # 请替换为实际的表名\n", "\n", "# 连接数据库\n", "connection = mysql.connector.connect(\n", @@ -88,11 +76,11 @@ "name": "stdout", "output_type": "stream", "text": [ - "成功创建表 partner_settlement_to_BI\n" + "成功创建表 province_city_person_relation_to_bi\n" ] } ], - "execution_count": 2 + "execution_count": 26 }, { "metadata": {},