diff --git a/back_ground_module/__init__.py b/back_ground_module/__init__.py index fe44d91..ea333ec 100644 --- a/back_ground_module/__init__.py +++ b/back_ground_module/__init__.py @@ -24,3 +24,5 @@ from back_ground_module.update_molecule_reporting_adjustment_to_bi import Molecu from back_ground_module.import_performance_data import ImportPerformanceData from back_ground_module.data_monitor import DataMonitor from back_ground_module.new_dealer_service_order_to_bi import NewDealerServiceOrderToBI +from back_ground_module.non_standar_performance_to_BI import NonStandardPerformanceToBI +from back_ground_module.partner_settlement_to_BI import PartnerSettlementToBI diff --git a/back_ground_module/logs/task.log b/back_ground_module/logs/task.log index fe75bd0..11555b7 100644 --- a/back_ground_module/logs/task.log +++ b/back_ground_module/logs/task.log @@ -8044,3 +8044,16 @@ 2025-08-18 12:10:19,041 - revisit_new_services_180.py - task_logger - INFO - 关联数据图片: 2025-08-18 12:10:20,202 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-18T04:10:14.333Z', 'updateTime': '2025-08-18T04:10:14.333Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-18T00:00:00.000Z', '_widget_1743644977694': '新签客户回访', '_widget_1744873387501': '2025-08-18T03:50:30.000Z', '_widget_1744873387502': '2025-08-18T04:10:20.000Z', '_widget_1744873387504': '1190', '_id': '68a2a7a6b00a82b43219c605', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}} 2025-08-18 12:10:20,202 - revisit_new_services_180.py - task_logger - INFO - 新签客户回访任务完成 +2025-08-20 17:19:06,286 - api.py - task_logger - INFO - 已获取 8 条数据 +2025-08-20 17:19:07,202 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-20T09:19:04.961Z', 'updateTime': '2025-08-20T09:19:04.961Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-20T00:00:00.000Z', '_widget_1743644977694': '非标业绩提报转BI', '_widget_1744873387501': '2025-08-20T09:19:06.000Z', '_widget_1744873387502': '2025-08-20T09:19:07.000Z', '_widget_1744873387504': '1', '_id': '68a593087f71408faf8b8991', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}} +2025-08-20 17:26:04,292 - partner_settlement_to_BI.py - task_logger - INFO - 任务开始 +2025-08-20 17:26:04,407 - partner_settlement_to_BI.py - task_logger - INFO - 加载数据完成 +2025-08-20 17:26:10,718 - non_standar_performance_to_BI.py - task_logger - INFO - 任务开始 +2025-08-20 17:26:10,820 - api.py - task_logger - INFO - 已获取 8 条数据 +2025-08-20 17:26:10,926 - non_standar_performance_to_BI.py - task_logger - INFO - 加载数据完成 +2025-08-20 17:26:10,936 - non_standar_performance_to_BI.py - task_logger - INFO - 数据处理完成 +2025-08-20 17:26:11,150 - non_standar_performance_to_BI.py - task_logger - INFO - 成功清空表 non_standard_performance_to_BI 中的所有数据 +2025-08-20 17:26:11,165 - non_standar_performance_to_BI.py - task_logger - INFO - 数据库连接已关闭 +2025-08-20 17:26:11,165 - non_standar_performance_to_BI.py - task_logger - INFO - 目标数据库已清空 +2025-08-20 17:26:11,430 - non_standar_performance_to_BI.py - task_logger - INFO - 数据已写入数据库中 +2025-08-20 17:26:11,563 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-20T09:26:09.312Z', 'updateTime': '2025-08-20T09:26:09.312Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-20T00:00:00.000Z', '_widget_1743644977694': '非标业绩提报转BI', '_widget_1744873387501': '2025-08-20T09:26:10.000Z', '_widget_1744873387502': '2025-08-20T09:26:11.000Z', '_widget_1744873387504': '1', '_id': '68a594b16856c7dacd7824a4', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}} diff --git a/back_ground_module/non_standar_performance_to_BI.py b/back_ground_module/non_standar_performance_to_BI.py new file mode 100644 index 0000000..8943c5d --- /dev/null +++ b/back_ground_module/non_standar_performance_to_BI.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +import pandas as pd +import datetime +from config import Config +from api import API +import pymysql # 使用 pymysql 替代 mysql.connector +from back_ground_module import CommonModule +import os +import mysql.connector +import pandas as pd +import json +import numpy as np +import mysql.connector +from mysql.connector import Error +from log_config import configure_task_logger, configure_error_task_logger + +logger = configure_task_logger() +error_task_logger = configure_error_task_logger() +api_instance = API() +common_module = CommonModule() +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) + + +class NonStandardPerformanceToBI: + def __init__(self): + self.dealer_service_data = None + self.field_mapping = { + "报备类型": "_widget_1753770875899", + "协作内容": "_widget_1753770875915", + "订单类型": "_widget_1753770875966", + "情况说明": "_widget_1753770875944", + "订单编号": "_widget_1753770875887", + "实付金额": "_widget_1753770875889", + "门店编码": "_widget_1753770875890", + "门店名称": "_widget_1753770875888", + "版本": "_widget_1753770875891", + "年限": "_widget_1753948745953", + "支付日期": "_widget_1753770875893", + "开户/处理日期": "_widget_1753770875894", + "小六业绩金额": "_widget_1753770875898", + "区域业绩金额": "_widget_1753770875937", + "报备业绩归属人": "_widget_1753770875901", + "报备业绩归属区域经理": "_widget_1753770875903", + "报备业绩归属大区": "_widget_1753866196486", + "原业绩归属人": "_widget_1753856032683", + "原业绩归属区域经理": "_widget_1753866196485", + "小六业绩比例": "_widget_1753770875917", + "区域业绩比例": "_widget_1753770875921", + "运营专家": "_widget_1753770875902", + "提成类型": "_widget_1753778922504", + "SaaS新签提成比例": "_widget_1753770875949", + "服务包提成比例": "_widget_1753778922567", + "提成金额": "_widget_1753770875948", + "新签提成比例-首年": "_widget_1753778922503", + "新签提成比例-非首年": "_widget_1753778922548", + "新签阶段及提成比例": "_widget_1753778656359", + "新签阶段及提成比例.选择提成阶段": "_widget_1753778656359._widget_1753778656361", + "新签阶段及提成比例.新签阶段": "_widget_1753778656359._widget_1753948745962", + "新签阶段及提成比例.提成比例": "_widget_1753778656359._widget_1753778656362", + } + + # 定义需要特殊处理的列表字段及其内部字段映射 + self.list_fields_config = { + "新签阶段及提成比例": { + "_widget_1753778656361": "选择提成阶段", + "_widget_1753948745962": "新签阶段", + "_widget_1753778656362": "提成比例" + }, + # 可以在这里添加其他列表字段的配置 + # "另一个列表字段": { + # "原始字段名1": "映射后字段名1", + # "原始字段名2": "映射后字段名2" + # } + } + + def load_all_data(self): + # 获取非标业绩提报数据 + payload = {"api_key": "66b9678280b37f8a276b1d01", + "entry_id": "68886b7c0382a7249ae0b5d6", + } + dealer_service = api_instance.entry_data_list(payload) + self.dealer_service_data = dealer_service.get("data") # api请求格式,将数据封装在data字典里 + + def process_list_field(self, field_value, field_config): + """通用方法:处理列表类型的字段""" + if not isinstance(field_value, (list, np.ndarray)): + return field_value + + processed_list = [] + for item in field_value: + if not isinstance(item, dict): + processed_list.append(item) + continue + + processed_item = {} + for original_key, mapped_key in field_config.items(): + if original_key in item: + # 处理包含id的字典字段 + if isinstance(item[original_key], dict) and "id" in item[original_key]: + processed_item[mapped_key] = item[original_key]["id"] + else: + processed_item[mapped_key] = item[original_key] + else: + processed_item[mapped_key] = None + processed_list.append(processed_item) + return processed_list + + def data_process(self): + df = pd.DataFrame(self.dealer_service_data) + # 反转映射字典 + reverse_mapping = {v: k for k, v in self.field_mapping.items()} + # 1.列明替换 + df.columns = [reverse_mapping.get(col, col) for col in df.columns] + + # 2.成员字段取值 + user_columns = ["报备业绩归属人", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家"] + + for col in user_columns: + df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") + + # 3.日期字段转为北京时间 + time_columns = ["支付日期", "开户/处理日期"] + + df[time_columns] = df[time_columns].apply( + lambda col: pd.to_datetime(col, errors='coerce') + .dt.tz_localize(None) + .dt.strftime('%Y-%m-%d %H:%M:%S') + ) + + # 4.处理所有配置的列表字段 + if "新签阶段及提成比例" in df.columns: + # 先处理订单登记表字段 + df["新签阶段及提成比例"] = df["新签阶段及提成比例"].apply( + lambda x: self.process_list_field(x, self.list_fields_config["新签阶段及提成比例"]) + if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x)) + else None + ) + + # 拆分行 + df_exploded = df.explode("新签阶段及提成比例") + + # 将订单登记表中的字段提取到主表中 + order_fields = self.list_fields_config["新签阶段及提成比例"].values() + for field in order_fields: + df_exploded[field] = df_exploded["新签阶段及提成比例"].apply( + lambda x: x.get(field) if isinstance(x, dict) else None + ) + + # 删除原始的订单登记表列 + df_exploded = df_exploded.drop(columns=["新签阶段及提成比例"]) + + # 重置索引 + df = df_exploded.reset_index(drop=True) + + return df + + def write_to_bi(self, df): + # 数据库连接信息 + HS_DB_Config = { + 'host': "f6-public.rwlb.rds.aliyuncs.com", + 'user': "rw_operation_data_relay", + 'password': "m+q5Z4%IVuF9bf", + 'database': "f6operation_data_relay" + } + table_name = "non_standard_performance_to_BI" # 替换为你的实际表名 + + # 建立数据库连接 + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + cursor = connection.cursor() + + try: + # 查询表列名 + cursor.execute(f"SHOW COLUMNS FROM {table_name}") + columns_info = cursor.fetchall() + db_columns = [col[0] for col in columns_info] # 提取列名 + df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None) + # 保留 DataFrame 中与数据库列名匹配的列 + filtered_df = df[df.columns.intersection(db_columns)] + + # 如果没有匹配的列,直接返回 + if filtered_df.empty: + print("DataFrame 中没有与数据库表结构匹配的列。") + return + + # 筛选列之后,插入前处理 dict 类型 + filtered_df = filtered_df.copy() + for col in filtered_df.columns: + if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any(): + filtered_df.loc[:, col] = filtered_df[col].apply( + lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x + ) + + # 构建插入语句 + placeholders = ', '.join(['%s'] * len(filtered_df.columns)) + # 使用反引号避免特殊列明 + columns = ', '.join([f"`{col}`" for col in filtered_df.columns]) + insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" + + # 将 DataFrame 写入数据库 + for _, row in filtered_df.iterrows(): + cursor.execute(insert_sql, tuple(row)) + + connection.commit() + logger.info(f"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。") + + except Exception as e: + error_task_logger.error(f"写入数据库时发生错误: {e}") + connection.rollback() + finally: + cursor.close() + connection.close() + + def clear_table_data(self): + """ + 清空指定 MySQL 表的数据。 + 参数已写死在函数内部,直接调用即可。 + """ + # 数据库连接信息 + HS_DB_Config = { + 'host': "f6-public.rwlb.rds.aliyuncs.com", + 'user': "rw_operation_data_relay", + 'password': "m+q5Z4%IVuF9bf", + 'database': "f6operation_data_relay" + } + table_name = "non_standard_performance_to_BI" # 要清空的表名 + + connection = None + try: + # 建立数据库连接 + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + if connection.is_connected(): + cursor = connection.cursor() + + # 使用TRUNCATE清空表数据 + cursor.execute(f"TRUNCATE TABLE {table_name}") + connection.commit() + + logger.info(f"成功清空表 {table_name} 中的所有数据") + + except Error as e: + error_task_logger.error(f"清空表时发生错误: {e}") + if connection and connection.is_connected(): + connection.rollback() + finally: + if connection and connection.is_connected(): + cursor.close() + connection.close() + logger.info("数据库连接已关闭") + + def main(self): + task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + logger.info("任务开始") + # step1: 获取数据 + self.load_all_data() + logger.info("加载数据完成") + # step2:数据处理 + df = self.data_process() + # df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv")) + logger.info("数据处理完成") + # step3:数据库删除 + self.clear_table_data() + logger.info("目标数据库已清空") + # step4:数据写入BI + self.write_to_bi(df) + logger.info("数据已写入数据库中") + common_module.send_task_status(task_start_time, "非标业绩提报转BI") + except Exception as e: + error_task_logger.error(f"非标业绩提报转BI发生错误{e}") + common_module.send_task_error(task_start_time,"非标业绩提报转BI", str(e)) + + +if __name__ == '__main__': + start = NonStandardPerformanceToBI() + start.main() diff --git a/back_ground_module/partner_settlement_to_BI.py b/back_ground_module/partner_settlement_to_BI.py new file mode 100644 index 0000000..59395e5 --- /dev/null +++ b/back_ground_module/partner_settlement_to_BI.py @@ -0,0 +1,278 @@ +## 获取数据 +# -*- coding: utf-8 -*- +import pandas as pd +import datetime +from config import Config +from api import API +import pymysql # 使用 pymysql 替代 mysql.connector +from back_ground_module import CommonModule +import os +import mysql.connector +import pandas as pd +import json +import numpy as np +import mysql.connector +from mysql.connector import Error +from log_config import configure_task_logger, configure_error_task_logger +import sys + +logger = configure_task_logger() +error_task_logger = configure_error_task_logger() +api_instance = API() +common_module = CommonModule() +output_dir = "output" # 设置输出目录 +os.makedirs(output_dir, exist_ok=True) + + +class PartnerSettlementToBI: + def __init__(self): + self.partner_settlement_data = None + self.field_mapping = { + "选择合伙人": "_widget_1753930627469", + "合伙人姓名": "_widget_1712801992726", + "手机号": "_widget_1712803222895", + "合伙人身份": "_widget_1712803222894", + "合伙人所在省市": "_widget_1712803222896", + "合伙人登记人": "_widget_1712803222900", + "战区经理": "_widget_1712803222901", + "提交人": "_widget_1753941892609", + "合伙人分类": "_widget_1753943042503", + "战区": "_widget_1754530653275", + "订单登记表": "_widget_1712803222905", + "订单登记表.订单编号": "_widget_1712803222905._widget_1712803222907", + "订单登记表.销售阶段": "_widget_1712803222905._widget_1712805391009", + "订单登记表.版本": "_widget_1712803222905._widget_1712803222908", + "订单登记表.年限": "_widget_1712803222905._widget_1712815331264", + "订单登记表.成交金额": "_widget_1712803222905._widget_1712805391002", + "订单登记表.佣金": "_widget_1712803222905._widget_1753952737266", + "订单登记表.理论佣金": "_widget_1712803222905._widget_1753952737267", + "订单登记表.佣金比例": "_widget_1712803222905._widget_1712807001396", + "合计佣金": "_widget_1753948415171", + "理论合计佣金": "_widget_1753952737280", + "特殊情况备注": "_widget_1712805391035", + "合伙人介绍证明(微信聊天截图等)": "_widget_1712815331256", + "合伙人类型": "_widget_1753957844818", + } + + # 定义需要特殊处理的列表字段及其内部字段映射 + self.list_fields_config = { + "订单登记表": { + "_widget_1712803222907": "订单编号", + "_widget_1712805391009": "销售阶段", + "_widget_1712803222908": "版本", + "_widget_1712815331264": "年限", + "_widget_1712805391002": "成交金额", + "_widget_1753952737266": "佣金", + "_widget_1753952737267": "理论佣金", + "_widget_1712807001396": "佣金比例", + }, + # 可以在这里添加其他列表字段的配置 + # "另一个列表字段": { + # "原始字段名1": "映射后字段名1", + # "原始字段名2": "映射后字段名2" + # } + } + + def load_all_data(self): + payload = {"api_key": "66b9678280b37f8a276b1d01", + # "entry_id": "68a57e3a0bc339d3384d1b0c", # 测试 + "entry_id": "661748c7c727764d79557674", + } + partner_settlement = api_instance.entry_data_list(payload) + self.partner_settlement_data = partner_settlement.get("data") # api请求格式,将数据封装在data字典里 + + def process_list_field(self, field_value, field_config): + """通用方法:处理列表类型的字段""" + if not isinstance(field_value, (list, np.ndarray)): + return field_value + + processed_list = [] + for item in field_value: + if not isinstance(item, dict): + processed_list.append(item) + continue + + processed_item = {} + for original_key, mapped_key in field_config.items(): + if original_key in item: + # 处理包含id的字典字段 + if isinstance(item[original_key], dict) and "id" in item[original_key]: + processed_item[mapped_key] = item[original_key]["id"] + else: + processed_item[mapped_key] = item[original_key] + else: + processed_item[mapped_key] = None + processed_list.append(processed_item) + return processed_list + + def data_process(self): + if not self.partner_settlement_data: + print("数据为空终止程序") + sys.exit(1) + df = pd.DataFrame(self.partner_settlement_data) + # 反转映射字典 + reverse_mapping = {v: k for k, v in self.field_mapping.items()} + # 1.列明替换 + df.columns = [reverse_mapping.get(col, col) for col in df.columns] + + # 2.成员字段取值 + user_columns = ["合伙人登记人", "提交人", "战区经理"] + + for col in user_columns: + df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") + + # 3.处理订单登记表列表字段,将其拆分成多行 + if "订单登记表" in df.columns: + # 先处理订单登记表字段 + df["订单登记表"] = df["订单登记表"].apply( + lambda x: self.process_list_field(x, self.list_fields_config["订单登记表"]) + if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x)) + else None + ) + + # 拆分行 + df_exploded = df.explode("订单登记表") + + # 将订单登记表中的字段提取到主表中 + order_fields = self.list_fields_config["订单登记表"].values() + for field in order_fields: + df_exploded[field] = df_exploded["订单登记表"].apply( + lambda x: x.get(field) if isinstance(x, dict) else None + ) + + # 删除原始的订单登记表列 + df_exploded = df_exploded.drop(columns=["订单登记表"]) + + # 重置索引 + df = df_exploded.reset_index(drop=True) + + return df + + def write_to_bi(self, df): + # 数据库连接信息 + HS_DB_Config = { + 'host': "f6-public.rwlb.rds.aliyuncs.com", + 'user': "rw_operation_data_relay", + 'password': "m+q5Z4%IVuF9bf", + 'database': "f6operation_data_relay" + } + table_name = "partner_settlement_to_BI" # 替换为你的实际表名 + + # 建立数据库连接 + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + cursor = connection.cursor() + + try: + # 查询表列名 + cursor.execute(f"SHOW COLUMNS FROM {table_name}") + columns_info = cursor.fetchall() + db_columns = [col[0] for col in columns_info] # 提取列名 + df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None) + # 保留 DataFrame 中与数据库列名匹配的列 + filtered_df = df[df.columns.intersection(db_columns)] + + # 如果没有匹配的列,直接返回 + if filtered_df.empty: + print("DataFrame 中没有与数据库表结构匹配的列。") + return + + # 筛选列之后,插入前处理 dict 类型 + filtered_df = filtered_df.copy() + for col in filtered_df.columns: + if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any(): + filtered_df.loc[:, col] = filtered_df[col].apply( + lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x + ) + + # 构建插入语句 + placeholders = ', '.join(['%s'] * len(filtered_df.columns)) + # 使用反引号避免特殊列明 + columns = ', '.join([f"`{col}`" for col in filtered_df.columns]) + insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" + + # 将 DataFrame 写入数据库 + for _, row in filtered_df.iterrows(): + cursor.execute(insert_sql, tuple(row)) + + connection.commit() + logger.info(f"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。") + + except Exception as e: + error_task_logger.error(f"写入数据库时发生错误: {e}") + connection.rollback() + finally: + cursor.close() + connection.close() + + def clear_table_data(self): + """ + 清空指定 MySQL 表的数据。 + 参数已写死在函数内部,直接调用即可。 + """ + # 数据库连接信息 + HS_DB_Config = { + 'host': "f6-public.rwlb.rds.aliyuncs.com", + 'user': "rw_operation_data_relay", + 'password': "m+q5Z4%IVuF9bf", + 'database': "f6operation_data_relay" + } + table_name = "partner_settlement_to_BI" # 要清空的表名 + + connection = None + try: + # 建立数据库连接 + connection = mysql.connector.connect( + host=HS_DB_Config["host"], + user=HS_DB_Config["user"], + password=HS_DB_Config["password"], + database=HS_DB_Config["database"] + ) + if connection.is_connected(): + cursor = connection.cursor() + + # 使用TRUNCATE清空表数据 + cursor.execute(f"TRUNCATE TABLE {table_name}") + connection.commit() + + print(f"成功清空表 {table_name} 中的所有数据") + + except Error as e: + print(f"清空表时发生错误: {e}") + if connection and connection.is_connected(): + connection.rollback() + finally: + if connection and connection.is_connected(): + cursor.close() + connection.close() + print("数据库连接已关闭") + + def main(self): + task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + try: + logger.info("任务开始") + # step1: 获取数据 + self.load_all_data() + logger.info("加载数据完成") + # step2:数据处理 + df = self.data_process() + # df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv")) + logger.info("数据处理完成") + # step3:数据库删除 + self.clear_table_data() + logger.info("目标数据库已清空") + # step4:数据写入BI + self.write_to_bi(df) + logger.info("数据已写入数据库中") + + common_module.send_task_status(task_start_time, "合伙人结算登记同步到BI") + except Exception as e: + error_task_logger.error(f"合伙人结算登记同步到BI发生错误:{e}") + common_module.send_task_error(task_start_time, "合伙人结算登记同步到BI", str(e)) + + +PartnerSettlementToBI().main() diff --git a/main.py b/main.py index eb4763f..3ad45fe 100644 --- a/main.py +++ b/main.py @@ -24,7 +24,6 @@ def main(): # 设置每分钟检查一次是否有新任务需要加载到队列 schedule.every(1).minutes.do(load_tasks_and_execute) - # 主循环,用于持续检查和执行定时任务 while True: schedule.run_pending() diff --git a/module.py b/module.py index a47adcf..85bc277 100644 --- a/module.py +++ b/module.py @@ -316,6 +316,30 @@ class Module: print("data_Exception_Task", e) return False + @staticmethod + def non_standar_performance_to_BI(): + print("data_monitor") + try: + non_standar_performance_to_BI = back_ground_module.NonStandardPerformanceToBI() + thread = threading.Thread(target=non_standar_performance_to_BI.main) + thread.start() + return "data_Exception_Task" + except Exception as e: + print("data_Exception_Task", e) + return False + + @staticmethod + def partner_settlement_to_BI(): + print("data_monitor") + try: + partner_settlement_to_BI = back_ground_module.PartnerSettlementToBI() + thread = threading.Thread(target=partner_settlement_to_BI.main) + thread.start() + return "data_Exception_Task" + except Exception as e: + print("data_Exception_Task", e) + return False + @staticmethod def text3(): print("text3") diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 96a0670..0000000 --- a/requirements.txt +++ /dev/null @@ -1,13 +0,0 @@ -cpca==0.5.5 -holidays==0.78 -mysql_connector_repackaged==0.3.1 -numpy==2.3.2 -pandas==2.3.1 -playwright==1.54.0 -psycopg2==2.9.10 -PyMySQL==1.1.1 -python_dateutil==2.9.0.post0 -Requests==2.32.4 -schedule==1.2.2 -tqdm==4.67.1 -pandas==2.3.1 \ No newline at end of file diff --git a/task_executor.py b/task_executor.py index e2f541f..a3b6dc0 100644 --- a/task_executor.py +++ b/task_executor.py @@ -39,6 +39,8 @@ def execute_task(task_id) -> bool: "字段监控": Module.data_monitor, "测试3": Module.text3, "经销商新签服务单转BI": Module.new_dealer_service_order_to_bi, + "合伙人结算登记同步到BI": Module.new_dealer_service_order_to_bi, + "非标业绩提报转BI": Module.new_dealer_service_order_to_bi, # 添加更多任务函数映射... } diff --git a/test/BI.ipynb b/test/BI.ipynb index 5fe77c7..3bddd87 100644 --- a/test/BI.ipynb +++ b/test/BI.ipynb @@ -12,8 +12,8 @@ "metadata": { "collapsed": true, "ExecuteTime": { - "end_time": "2025-07-22T07:13:51.660146Z", - "start_time": "2025-07-22T07:13:51.499355Z" + "end_time": "2025-08-20T09:06:39.520648Z", + "start_time": "2025-08-20T09:06:39.167174Z" } }, "source": [ @@ -25,56 +25,31 @@ "def create_table(cursor, table_name):\n", " \"\"\"创建数据表\"\"\"\n", " create_table_query = f\"\"\"\n", - " CREATE TABLE IF NOT EXISTS {table_name} (\n", - " id INT AUTO_INCREMENT PRIMARY KEY,\n", - " 购买的产品名称 VARCHAR(255) COMMENT '购买的产品名称',\n", - " 经销商名称 VARCHAR(255) COMMENT '经销商名称',\n", - " 经销商简称 VARCHAR(255) COMMENT '经销商简称',\n", - " 负责人姓名 VARCHAR(255) COMMENT '负责人姓名',\n", - " 负责人手机号 VARCHAR(255) COMMENT '负责人手机号',\n", - " 经销商可使用的群数量 VARCHAR(255) COMMENT '经销商可使用的群数量',\n", - " 订单编码 VARCHAR(255) COMMENT '订单编码',\n", - " 订单支付时间 VARCHAR(255) COMMENT '订单支付时间',\n", - " 商户门店ID VARCHAR(255) COMMENT '商户门店ID',\n", - " 开通时间 VARCHAR(255) COMMENT '开通时间',\n", - " 详细地址 VARCHAR(255) COMMENT '详细地址',\n", - " 联系电话 VARCHAR(255) COMMENT '联系电话',\n", - " 系统到期时间 VARCHAR(255) COMMENT '系统到期时间',\n", - " 开通状态 VARCHAR(255) COMMENT '开通状态',\n", - " 销售负责人 VARCHAR(255) COMMENT '销售负责人',\n", - " 运营顾问 VARCHAR(255) COMMENT '运营顾问',\n", - " 运营专家 VARCHAR(255) COMMENT '运营专家',\n", - " 区域经理 VARCHAR(255) COMMENT '区域经理',\n", - " 业务人员 VARCHAR(255) COMMENT '业务人员',\n", - " 是否设置经营范围 VARCHAR(255) COMMENT '是否设置经营范围',\n", - " 不设置经营范围原因 VARCHAR(255) COMMENT '不设置经营范围原因',\n", - " 是否建群 VARCHAR(255) COMMENT '是否建群',\n", - " 不建群原因 VARCHAR(255) COMMENT '不建群原因',\n", - " 是否设置备货清单 VARCHAR(255) COMMENT '是否设置备货清单',\n", - " 不设置备货清单原因 VARCHAR(255) COMMENT '不设置备货清单原因',\n", - " 是否设置报价 VARCHAR(255) COMMENT '是否设置报价',\n", - " 不设置报价原因 VARCHAR(255) COMMENT '不设置报价原因',\n", - " 是否上货 VARCHAR(255) COMMENT '是否上货',\n", - " 不上货原因 VARCHAR(255) COMMENT '不上货原因',\n", - " 是否培训系统使用 VARCHAR(255) COMMENT '是否培训系统使用',\n", - " 不培训系统使用原因 VARCHAR(255) COMMENT '不培训系统使用原因',\n", - " 是否补货 VARCHAR(255) COMMENT '是否补货',\n", - " 不补货原因 VARCHAR(255) COMMENT '不补货原因',\n", - " `是否进行滞销回抽+盘点介绍` VARCHAR(255) COMMENT '是否进行滞销回抽+盘点介绍',\n", - " `不进行滞销回抽+盘点介绍原因` VARCHAR(255) COMMENT '不进行滞销回抽+盘点介绍原因',\n", - " 服务是否满意 VARCHAR(255) COMMENT '服务是否满意',\n", - " 服务不满意原因 VARCHAR(255) COMMENT '服务不满意原因',\n", - " 产品是否满意 VARCHAR(255) COMMENT '产品是否满意',\n", - " 产品不满意原因 VARCHAR(255) COMMENT '产品不满意原因',\n", - " 上传评价图片 VARCHAR(255) COMMENT '上传评价图片',\n", - " 审核备注 VARCHAR(255) COMMENT '审核备注',\n", - " 完成日期时间 VARCHAR(255) COMMENT '完成日期时间',\n", - " 流水号 VARCHAR(255) COMMENT '流水号',\n", - " 提交人 VARCHAR(255) COMMENT '提交人',\n", - " 提交时间 VARCHAR(255) COMMENT '提交时间',\n", - " 更新时间 VARCHAR(255) COMMENT '更新时间'\n", - "\n", - " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;\n", + " CREATE TABLE IF NOT EXISTS {table_name} (\n", + " `选择合伙人` VARCHAR(255) COMMENT '选择合伙人',\n", + " `合伙人姓名` VARCHAR(255) COMMENT '合伙人姓名',\n", + " `手机号` VARCHAR(255) COMMENT '手机号',\n", + " `合伙人身份` VARCHAR(255) COMMENT '合伙人身份',\n", + " `合伙人所在省市` VARCHAR(255) COMMENT '合伙人所在省市',\n", + " `合伙人登记人` VARCHAR(255) COMMENT '合伙人登记人',\n", + " `战区经理` VARCHAR(255) COMMENT '战区经理',\n", + " `提交人` VARCHAR(255) COMMENT '提交人',\n", + " `合伙人分类` VARCHAR(255) COMMENT '合伙人分类',\n", + " `战区` VARCHAR(255) COMMENT '战区',\n", + " `订单编号` VARCHAR(255) COMMENT '订单登记表.订单编号',\n", + " `销售阶段` VARCHAR(255) COMMENT '订单登记表.销售阶段',\n", + " `版本` VARCHAR(255) COMMENT '订单登记表.版本',\n", + " `年限` VARCHAR(255) COMMENT '订单登记表.年限',\n", + " `成交金额` VARCHAR(255) COMMENT '订单登记表.成交金额',\n", + " `佣金` VARCHAR(255) COMMENT '订单登记表.佣金',\n", + " `理论佣金` VARCHAR(255) COMMENT '订单登记表.理论佣金',\n", + " `佣金比例` VARCHAR(255) COMMENT '订单登记表.佣金比例',\n", + " `合计佣金` VARCHAR(255) COMMENT '合计佣金',\n", + " `理论合计佣金` VARCHAR(255) COMMENT '理论合计佣金',\n", + " `特殊情况备注` VARCHAR(255) COMMENT '特殊情况备注',\n", + " `合伙人介绍证明` VARCHAR(255) COMMENT '合伙人介绍证明(微信聊天截图等)',\n", + " `合伙人类型` VARCHAR(255) COMMENT '合伙人类型'\n", + " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ;\n", " \"\"\"\n", " cursor.execute(create_table_query)\n", " print(f\"成功创建表 {table_name}\")\n", @@ -89,7 +64,7 @@ "} # 衡时数据库链接配置-mysql\n", "\n", "# 表名\n", - "table_name = \"new_dealer_service_order_to_bi\" # 请替换为实际的表名\n", + "table_name = \"partner_settlement_to_BI\" # 请替换为实际的表名\n", "\n", "# 连接数据库\n", "connection = mysql.connector.connect(\n", @@ -113,11 +88,11 @@ "name": "stdout", "output_type": "stream", "text": [ - "成功创建表 new_dealer_service_order_to_bi\n" + "成功创建表 partner_settlement_to_BI\n" ] } ], - "execution_count": 3 + "execution_count": 2 }, { "metadata": {}, @@ -248,7 +223,9 @@ " if connection.is_connected():\n", " cursor.close()\n", " connection.close()\n", - " print(\"数据库连接已关闭\")\n" + " print(\"数据库连接已关闭\")\n", + "\n", + "\n" ], "id": "406f1e2ca21ad9a", "outputs": [ @@ -272,8 +249,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-08-07T01:51:35.017905Z", - "start_time": "2025-08-07T01:51:34.722542Z" + "end_time": "2025-08-20T08:07:48.856164Z", + "start_time": "2025-08-20T08:07:48.650261Z" } }, "cell_type": "code", @@ -287,10 +264,10 @@ " 'password': \"m+q5Z4%IVuF9bf\",\n", " 'database': \"f6operation_data_relay\"\n", "} # 衡时数据库链接配置-mysql\n", - "table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n", + "# table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n", "\n", - "# table_name = \"jiandaoyun_crm_customer_profile\"\n", - "column_name = \"培训完成时间\"\n", + "table_name = \"non_standard_performance_to_BI\"\n", + "column_name = \"开户/处理日期\"\n", "# new_column_type = \"VARCHAR(255)\" # 目标数据类型\n", "new_column_type = \"DATETIME\" # 目标数据类型\n", "\n", @@ -351,12 +328,12 @@ "name": "stdout", "output_type": "stream", "text": [ - "✅ 成功添加字段: `培训完成时间`\n", + "❌ 操作失败:1146 (42S02): Table 'f6operation_data_relay.non_standard_performance_to_bi' doesn't exist\n", "数据库连接已关闭\n" ] } ], - "execution_count": 2 + "execution_count": 4 }, { "metadata": {}, @@ -488,6 +465,62 @@ " print(\"数据库连接已关闭\")" ], "id": "fe36740aa6724433" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "## BI删表\n", + "id": "76b76aed2ce2a77f" + }, + { + "metadata": {}, + "cell_type": "code", + "outputs": [], + "execution_count": null, + "source": [ + "import mysql.connector\n", + "from mysql.connector import Error\n", + "\n", + "\n", + "def drop_table(cursor, table_name):\n", + " \"\"\"删除数据表\"\"\"\n", + " drop_table_query = f\"DROP TABLE IF EXISTS {table_name};\"\n", + " cursor.execute(drop_table_query)\n", + " print(f\"成功删除表 {table_name}\")\n", + "\n", + "\n", + "# 数据库连接信息\n", + "HS_DB_Config = {\n", + " 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n", + " 'user': \"rw_operation_data_relay\",\n", + " 'password': \"m+q5Z4%IVuF9bf\",\n", + " 'database': \"f6operation_data_relay\"\n", + "} # 衡时数据库链接配置-mysql\n", + "\n", + "# 表名\n", + "table_name = \"业绩报备表\" # 请替换为实际的表名\n", + "\n", + "# 连接数据库\n", + "connection = mysql.connector.connect(\n", + " host=HS_DB_Config[\"host\"],\n", + " user=HS_DB_Config[\"user\"],\n", + " password=HS_DB_Config[\"password\"],\n", + " database=HS_DB_Config[\"database\"]\n", + ")\n", + "\n", + "cursor = connection.cursor()\n", + "\n", + "# 删除表\n", + "drop_table(cursor, table_name)\n", + "\n", + "# 提交更改\n", + "connection.commit()\n", + "\n", + "# 关闭连接\n", + "cursor.close()\n", + "connection.close()" + ], + "id": "daf2c94f811fbcdd" } ], "metadata": { diff --git a/test/logs/task.log b/test/logs/task.log index 50cced7..2ae3442 100644 --- a/test/logs/task.log +++ b/test/logs/task.log @@ -13,3 +13,12 @@ 2025-08-18 10:54:42,864 - api.py - task_logger - INFO - 已获取 3 条数据 2025-08-18 10:58:48,082 - api.py - task_logger - INFO - 已获取 3 条数据 2025-08-18 10:59:36,818 - api.py - task_logger - INFO - 已获取 3 条数据 +2025-08-20 15:32:45,325 - api.py - task_logger - INFO - 已获取 1 条数据 +2025-08-20 15:34:42,396 - api.py - task_logger - INFO - 已获取 1 条数据 +2025-08-20 15:48:56,360 - api.py - task_logger - INFO - 已获取 1 条数据 +2025-08-20 15:52:30,961 - api.py - task_logger - INFO - 已获取 2 条数据 +2025-08-20 15:54:13,599 - api.py - task_logger - INFO - 已获取 7 条数据 +2025-08-20 16:48:57,164 - api.py - task_logger - INFO - 已获取 10 条数据 +2025-08-20 16:48:57,997 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-20T08:48:55.784Z', 'updateTime': '2025-08-20T08:48:55.784Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-20T00:00:00.000Z', '_widget_1743644977694': '非标业绩提报转BI', '_widget_1744873387501': '2025-08-20T08:48:57.000Z', '_widget_1744873387502': '2025-08-20T08:48:57.000Z', '_widget_1744873387504': '0', '_id': '68a58bf734f6e13aec32ca2a', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}} +2025-08-20 16:58:28,211 - api.py - task_logger - INFO - 已获取 8 条数据 +2025-08-20 16:58:29,045 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-20T08:58:26.821Z', 'updateTime': '2025-08-20T08:58:26.821Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-20T00:00:00.000Z', '_widget_1743644977694': '非标业绩提报转BI', '_widget_1744873387501': '2025-08-20T08:58:28.000Z', '_widget_1744873387502': '2025-08-20T08:58:28.000Z', '_widget_1744873387504': '0', '_id': '68a58e326435007d9a859fa2', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}} diff --git a/test/合伙人结算登记同步到BI.ipynb b/test/合伙人结算登记同步到BI.ipynb new file mode 100644 index 0000000..ccb2e81 --- /dev/null +++ b/test/合伙人结算登记同步到BI.ipynb @@ -0,0 +1,348 @@ +{ + "cells": [ + { + "metadata": {}, + "cell_type": "markdown", + "source": "## 合伙人结算登记表同步到Bi", + "id": "c73b9afd879b3e18" + }, + { + "cell_type": "code", + "id": "initial_id", + "metadata": { + "collapsed": true, + "ExecuteTime": { + "end_time": "2025-08-20T09:17:27.280694Z", + "start_time": "2025-08-20T09:17:27.096281Z" + } + }, + "source": [ + "## 获取数据\n", + "# -*- coding: utf-8 -*-\n", + "import pandas as pd\n", + "import datetime\n", + "from config import Config\n", + "from api import API\n", + "import pymysql # 使用 pymysql 替代 mysql.connector\n", + "from back_ground_module import CommonModule\n", + "import os\n", + "import mysql.connector\n", + "import pandas as pd\n", + "import json\n", + "import numpy as np\n", + "import mysql.connector\n", + "from mysql.connector import Error\n", + "from log_config import configure_task_logger, configure_error_task_logger\n", + "import sys\n", + "\n", + "logger = configure_task_logger()\n", + "error_task_logger = configure_error_task_logger()\n", + "api_instance = API()\n", + "common_module = CommonModule()\n", + "output_dir = \"output\" # 设置输出目录\n", + "os.makedirs(output_dir, exist_ok=True)\n", + "\n", + "\n", + "class PartnerSettlementToBI:\n", + " def __init__(self):\n", + " self.partner_settlement_data = None\n", + " self.field_mapping = {\n", + " \"选择合伙人\": \"_widget_1753930627469\",\n", + " \"合伙人姓名\": \"_widget_1712801992726\",\n", + " \"手机号\": \"_widget_1712803222895\",\n", + " \"合伙人身份\": \"_widget_1712803222894\",\n", + " \"合伙人所在省市\": \"_widget_1712803222896\",\n", + " \"合伙人登记人\": \"_widget_1712803222900\",\n", + " \"战区经理\": \"_widget_1712803222901\",\n", + " \"提交人\": \"_widget_1753941892609\",\n", + " \"合伙人分类\": \"_widget_1753943042503\",\n", + " \"战区\": \"_widget_1754530653275\",\n", + " \"订单登记表\": \"_widget_1712803222905\",\n", + " \"订单登记表.订单编号\": \"_widget_1712803222905._widget_1712803222907\",\n", + " \"订单登记表.销售阶段\": \"_widget_1712803222905._widget_1712805391009\",\n", + " \"订单登记表.版本\": \"_widget_1712803222905._widget_1712803222908\",\n", + " \"订单登记表.年限\": \"_widget_1712803222905._widget_1712815331264\",\n", + " \"订单登记表.成交金额\": \"_widget_1712803222905._widget_1712805391002\",\n", + " \"订单登记表.佣金\": \"_widget_1712803222905._widget_1753952737266\",\n", + " \"订单登记表.理论佣金\": \"_widget_1712803222905._widget_1753952737267\",\n", + " \"订单登记表.佣金比例\": \"_widget_1712803222905._widget_1712807001396\",\n", + " \"合计佣金\": \"_widget_1753948415171\",\n", + " \"理论合计佣金\": \"_widget_1753952737280\",\n", + " \"特殊情况备注\": \"_widget_1712805391035\",\n", + " \"合伙人介绍证明(微信聊天截图等)\": \"_widget_1712815331256\",\n", + " \"合伙人类型\": \"_widget_1753957844818\",\n", + " }\n", + "\n", + " # 定义需要特殊处理的列表字段及其内部字段映射\n", + " self.list_fields_config = {\n", + " \"订单登记表\": {\n", + " \"_widget_1712803222907\": \"订单编号\",\n", + " \"_widget_1712805391009\": \"销售阶段\",\n", + " \"_widget_1712803222908\": \"版本\",\n", + " \"_widget_1712815331264\": \"年限\",\n", + " \"_widget_1712805391002\": \"成交金额\",\n", + " \"_widget_1753952737266\": \"佣金\",\n", + " \"_widget_1753952737267\": \"理论佣金\",\n", + " \"_widget_1712807001396\": \"佣金比例\",\n", + " },\n", + " # 可以在这里添加其他列表字段的配置\n", + " # \"另一个列表字段\": {\n", + " # \"原始字段名1\": \"映射后字段名1\",\n", + " # \"原始字段名2\": \"映射后字段名2\"\n", + " # }\n", + " }\n", + "\n", + " def load_all_data(self):\n", + " payload = {\"api_key\": \"66b9678280b37f8a276b1d01\",\n", + " # \"entry_id\": \"68a57e3a0bc339d3384d1b0c\", # 测试\n", + " \"entry_id\": \"661748c7c727764d79557674\",\n", + " }\n", + " partner_settlement = api_instance.entry_data_list(payload)\n", + " self.partner_settlement_data = partner_settlement.get(\"data\") # api请求格式,将数据封装在data字典里\n", + "\n", + " def process_list_field(self, field_value, field_config):\n", + " \"\"\"通用方法:处理列表类型的字段\"\"\"\n", + " if not isinstance(field_value, (list, np.ndarray)):\n", + " return field_value\n", + "\n", + " processed_list = []\n", + " for item in field_value:\n", + " if not isinstance(item, dict):\n", + " processed_list.append(item)\n", + " continue\n", + "\n", + " processed_item = {}\n", + " for original_key, mapped_key in field_config.items():\n", + " if original_key in item:\n", + " # 处理包含id的字典字段\n", + " if isinstance(item[original_key], dict) and \"id\" in item[original_key]:\n", + " processed_item[mapped_key] = item[original_key][\"id\"]\n", + " else:\n", + " processed_item[mapped_key] = item[original_key]\n", + " else:\n", + " processed_item[mapped_key] = None\n", + " processed_list.append(processed_item)\n", + " return processed_list\n", + "\n", + " def data_process(self):\n", + " if not self.partner_settlement_data:\n", + " print(\"数据为空终止程序\")\n", + " sys.exit(1)\n", + " df = pd.DataFrame(self.partner_settlement_data)\n", + " # 反转映射字典\n", + " reverse_mapping = {v: k for k, v in self.field_mapping.items()}\n", + " # 1.列明替换\n", + " df.columns = [reverse_mapping.get(col, col) for col in df.columns]\n", + "\n", + " # 2.成员字段取值\n", + " user_columns = [\"合伙人登记人\", \"提交人\", \"战区经理\"]\n", + "\n", + " for col in user_columns:\n", + " df[col] = df[col].map(lambda x: x.get(\"name\", \"\") if isinstance(x, dict) else \"\")\n", + "\n", + " # 3.处理订单登记表列表字段,将其拆分成多行\n", + " if \"订单登记表\" in df.columns:\n", + " # 先处理订单登记表字段\n", + " df[\"订单登记表\"] = df[\"订单登记表\"].apply(\n", + " lambda x: self.process_list_field(x, self.list_fields_config[\"订单登记表\"])\n", + " if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x))\n", + " else None\n", + " )\n", + "\n", + " # 拆分行\n", + " df_exploded = df.explode(\"订单登记表\")\n", + "\n", + " # 将订单登记表中的字段提取到主表中\n", + " order_fields = self.list_fields_config[\"订单登记表\"].values()\n", + " for field in order_fields:\n", + " df_exploded[field] = df_exploded[\"订单登记表\"].apply(\n", + " lambda x: x.get(field) if isinstance(x, dict) else None\n", + " )\n", + "\n", + " # 删除原始的订单登记表列\n", + " df_exploded = df_exploded.drop(columns=[\"订单登记表\"])\n", + "\n", + " # 重置索引\n", + " df = df_exploded.reset_index(drop=True)\n", + "\n", + " return df\n", + "\n", + " def write_to_bi(self, df):\n", + " # 数据库连接信息\n", + " HS_DB_Config = {\n", + " 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n", + " 'user': \"rw_operation_data_relay\",\n", + " 'password': \"m+q5Z4%IVuF9bf\",\n", + " 'database': \"f6operation_data_relay\"\n", + " }\n", + " table_name = \"partner_settlement_to_BI\" # 替换为你的实际表名\n", + "\n", + " # 建立数据库连接\n", + " connection = mysql.connector.connect(\n", + " host=HS_DB_Config[\"host\"],\n", + " user=HS_DB_Config[\"user\"],\n", + " password=HS_DB_Config[\"password\"],\n", + " database=HS_DB_Config[\"database\"]\n", + " )\n", + " cursor = connection.cursor()\n", + "\n", + " try:\n", + " # 查询表列名\n", + " cursor.execute(f\"SHOW COLUMNS FROM {table_name}\")\n", + " columns_info = cursor.fetchall()\n", + " db_columns = [col[0] for col in columns_info] # 提取列名\n", + " df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None)\n", + " # 保留 DataFrame 中与数据库列名匹配的列\n", + " filtered_df = df[df.columns.intersection(db_columns)]\n", + "\n", + " # 如果没有匹配的列,直接返回\n", + " if filtered_df.empty:\n", + " print(\"DataFrame 中没有与数据库表结构匹配的列。\")\n", + " return\n", + "\n", + " # 筛选列之后,插入前处理 dict 类型\n", + " filtered_df = filtered_df.copy()\n", + " for col in filtered_df.columns:\n", + " if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any():\n", + " filtered_df.loc[:, col] = filtered_df[col].apply(\n", + " lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x\n", + " )\n", + "\n", + " # 构建插入语句\n", + " placeholders = ', '.join(['%s'] * len(filtered_df.columns))\n", + " # 使用反引号避免特殊列明\n", + " columns = ', '.join([f\"`{col}`\" for col in filtered_df.columns])\n", + " insert_sql = f\"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})\"\n", + "\n", + " # 将 DataFrame 写入数据库\n", + " for _, row in filtered_df.iterrows():\n", + " cursor.execute(insert_sql, tuple(row))\n", + "\n", + " connection.commit()\n", + " print(f\"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。\")\n", + "\n", + " except Exception as e:\n", + " print(\"写入数据库时发生错误:\", e)\n", + " connection.rollback()\n", + " finally:\n", + " cursor.close()\n", + " connection.close()\n", + "\n", + " def clear_table_data(self):\n", + " \"\"\"\n", + " 清空指定 MySQL 表的数据。\n", + " 参数已写死在函数内部,直接调用即可。\n", + " \"\"\"\n", + " # 数据库连接信息\n", + " HS_DB_Config = {\n", + " 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n", + " 'user': \"rw_operation_data_relay\",\n", + " 'password': \"m+q5Z4%IVuF9bf\",\n", + " 'database': \"f6operation_data_relay\"\n", + " }\n", + " table_name = \"partner_settlement_to_BI\" # 要清空的表名\n", + "\n", + " connection = None\n", + " try:\n", + " # 建立数据库连接\n", + " connection = mysql.connector.connect(\n", + " host=HS_DB_Config[\"host\"],\n", + " user=HS_DB_Config[\"user\"],\n", + " password=HS_DB_Config[\"password\"],\n", + " database=HS_DB_Config[\"database\"]\n", + " )\n", + " if connection.is_connected():\n", + " cursor = connection.cursor()\n", + "\n", + " # 使用TRUNCATE清空表数据\n", + " cursor.execute(f\"TRUNCATE TABLE {table_name}\")\n", + " connection.commit()\n", + "\n", + " print(f\"成功清空表 {table_name} 中的所有数据\")\n", + "\n", + " except Error as e:\n", + " print(f\"清空表时发生错误: {e}\")\n", + " if connection and connection.is_connected():\n", + " connection.rollback()\n", + " finally:\n", + " if connection and connection.is_connected():\n", + " cursor.close()\n", + " connection.close()\n", + " print(\"数据库连接已关闭\")\n", + "\n", + " def main(self):\n", + " task_start_time = datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n", + "\n", + " # 获取数据\n", + " self.load_all_data()\n", + " print(\"数据加载完成\")\n", + "\n", + " # 处理数据\n", + " df = self.data_process()\n", + " # df.to_csv(f\"{output_dir}/partner_settlement.csv\", index=False)\n", + "\n", + " # step3:数据库删除\n", + " self.clear_table_data()\n", + "\n", + " # step4:数据写入BI\n", + " self.write_to_bi(df)\n", + "\n", + " common_module.send_task_status(task_start_time, \"合伙人结算登记同步到BI\")\n", + "\n", + "\n", + "PartnerSettlementToBI().main()\n", + "\n" + ], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "数据加载完成\n", + "[]\n", + "数据为空终止程序\n" + ] + }, + { + "ename": "SystemExit", + "evalue": "1", + "output_type": "error", + "traceback": [ + "An exception has occurred, use %tb to see the full traceback.\n", + "\u001B[31mSystemExit\u001B[39m\u001B[31m:\u001B[39m 1\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "D:\\ProgramTools\\anaconda3\\envs\\jdy\\Lib\\site-packages\\IPython\\core\\interactiveshell.py:3707: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D.\n", + " warn(\"To exit: use 'exit', 'quit', or Ctrl-D.\", stacklevel=1)\n" + ] + } + ], + "execution_count": 7 + } + ], + "metadata": { + "kernelspec": { + "display_name": "saas", + "language": "python", + "name": "saas" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/test/同步.ipynb b/test/同步.ipynb new file mode 100644 index 0000000..6d0b0fd --- /dev/null +++ b/test/同步.ipynb @@ -0,0 +1,129 @@ +{ + "cells": [ + { + "metadata": {}, + "cell_type": "markdown", + "source": "", + "id": "4eeb08f90b26d53f" + }, + { + "cell_type": "code", + "id": "initial_id", + "metadata": { + "collapsed": true, + "ExecuteTime": { + "end_time": "2025-08-20T08:27:40.142050Z", + "start_time": "2025-08-20T08:27:38.703087Z" + } + }, + "source": [ + "from datetime import datetime, timezone, timedelta, date, UTC\n", + "import holidays\n", + "from config import Config\n", + "import psycopg2\n", + "import pandas as pd\n", + "import pymysql\n", + "from api import API\n", + "from log_config import configure_task_logger, configure_error_task_logger\n", + "\n", + "api_instance = API()\n", + "# 获取已经配置好的常规日志记录器\n", + "logger = configure_task_logger()\n", + "\n", + "# 获取已经配置好的错误任务日志记录器\n", + "error_task_logger = configure_error_task_logger()\n", + "\n", + "\n", + "def get_ngv_details():\n", + " \"\"\"\n", + " 从固定的数据库中获取前几天的NGV明细。\n", + " 参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。\n", + " 返回包含NGV明细的pandas DataFrame。\n", + " \"\"\"\n", + " try:\n", + " # 获得连接\n", + " conn = psycopg2.connect(**Config.CONN_INFO)\n", + " cursor = conn.cursor()\n", + "\n", + " # sql语句查询\n", + " sql = f\"\"\"\n", + " SELECT * FROM \"public\".\"saas_ngv_yesterday\";\n", + " \"\"\"\n", + "\n", + " # 执行语句并获取结果集\n", + " cursor.execute(sql)\n", + " rows = cursor.fetchall()\n", + " all_fields = cursor.description\n", + "\n", + " # 执行结果转化为dataframe\n", + " col = [i[0] for i in all_fields]\n", + " data_NGV = pd.DataFrame(rows, columns=col)\n", + "\n", + " # 尝试自动解析日期时间字符串\n", + " time_format = \"%Y-%m-%d %H:%M:%S\"\n", + " if 'saas_create_time' in data_NGV.columns:\n", + " data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format,\n", + " errors='coerce')\n", + " data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d')\n", + "\n", + " # 关闭游标和连接\n", + " cursor.close()\n", + " conn.close()\n", + "\n", + " return data_NGV\n", + "\n", + " except Exception as e:\n", + " print(f\"Error occurred: {e}\")\n", + " return None\n", + "\n", + "df = get_ngv_details()\n", + "df.to_csv(\"中石化ngv同步.csv\", index=False)" + ], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Error occurred: relation \"public.saas_ngv_yesterday\" does not exist\n", + "LINE 2: SELECT * FROM \"public\".\"saas_ngv_yesterday\";\n", + " ^\n", + "\n" + ] + }, + { + "ename": "AttributeError", + "evalue": "'NoneType' object has no attribute 'to_csv'", + "output_type": "error", + "traceback": [ + "\u001B[31m---------------------------------------------------------------------------\u001B[39m", + "\u001B[31mAttributeError\u001B[39m Traceback (most recent call last)", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[1]\u001B[39m\u001B[32m, line 63\u001B[39m\n\u001B[32m 60\u001B[39m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m\n\u001B[32m 62\u001B[39m df = get_ngv_details()\n\u001B[32m---> \u001B[39m\u001B[32m63\u001B[39m df.to_csv(\u001B[33m\"\u001B[39m\u001B[33m中石化ngv同步.csv\u001B[39m\u001B[33m\"\u001B[39m, index=\u001B[38;5;28;01mFalse\u001B[39;00m)\n", + "\u001B[31mAttributeError\u001B[39m: 'NoneType' object has no attribute 'to_csv'" + ] + } + ], + "execution_count": 1 + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/test/非标业绩提报数据迁移到bi.ipynb b/test/非标业绩提报数据迁移到bi.ipynb index 8090dc5..72f55a3 100644 --- a/test/非标业绩提报数据迁移到bi.ipynb +++ b/test/非标业绩提报数据迁移到bi.ipynb @@ -12,8 +12,8 @@ "metadata": { "collapsed": true, "ExecuteTime": { - "end_time": "2025-08-12T03:35:11.151029Z", - "start_time": "2025-08-12T03:35:11.006279Z" + "end_time": "2025-08-20T08:58:29.047944Z", + "start_time": "2025-08-20T08:58:27.944621Z" } }, "source": [ @@ -31,19 +31,17 @@ "import numpy as np\n", "import mysql.connector\n", "from mysql.connector import Error\n", + "from log_config import configure_task_logger, configure_error_task_logger\n", "\n", - "start_time = datetime.datetime.now()\n", + "logger = configure_task_logger()\n", + "error_task_logger = configure_error_task_logger()\n", "api_instance = API()\n", "common_module = CommonModule()\n", - "\n", - "# 保存为CSV文件\n", "output_dir = \"output\" # 设置输出目录\n", - "\n", - "# 创建输出目录(如果不存在)\n", "os.makedirs(output_dir, exist_ok=True)\n", "\n", "\n", - "class NewDealerServiceOrderToBI:\n", + "class NonStandardPerformanceToBI:\n", " def __init__(self):\n", " self.dealer_service_data = None\n", " self.field_mapping = {\n", @@ -150,13 +148,30 @@ " )\n", "\n", " # 4.处理所有配置的列表字段\n", - " for field_name, field_config in self.list_fields_config.items():\n", - " if field_name in df.columns:\n", - " # 修改这里,确保正确处理数组/列表类型的数据\n", - " df[field_name] = df[field_name].apply(\n", - " lambda x: self.process_list_field(x, field_config) if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x)) else None\n", + " if \"新签阶段及提成比例\" in df.columns:\n", + " # 先处理订单登记表字段\n", + " df[\"新签阶段及提成比例\"] = df[\"新签阶段及提成比例\"].apply(\n", + " lambda x: self.process_list_field(x, self.list_fields_config[\"新签阶段及提成比例\"])\n", + " if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x))\n", + " else None\n", + " )\n", + "\n", + " # 拆分行\n", + " df_exploded = df.explode(\"新签阶段及提成比例\")\n", + "\n", + " # 将订单登记表中的字段提取到主表中\n", + " order_fields = self.list_fields_config[\"新签阶段及提成比例\"].values()\n", + " for field in order_fields:\n", + " df_exploded[field] = df_exploded[\"新签阶段及提成比例\"].apply(\n", + " lambda x: x.get(field) if isinstance(x, dict) else None\n", " )\n", "\n", + " # 删除原始的订单登记表列\n", + " df_exploded = df_exploded.drop(columns=[\"新签阶段及提成比例\"])\n", + "\n", + " # 重置索引\n", + " df = df_exploded.reset_index(drop=True)\n", + "\n", " return df\n", "\n", " def write_to_bi(self, df):\n", @@ -167,7 +182,7 @@ " 'password': \"m+q5Z4%IVuF9bf\",\n", " 'database': \"f6operation_data_relay\"\n", " }\n", - " table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n", + " table_name = \"non_standard_performance_to_BI\" # 替换为你的实际表名\n", "\n", " # 建立数据库连接\n", " connection = mysql.connector.connect(\n", @@ -232,7 +247,7 @@ " 'password': \"m+q5Z4%IVuF9bf\",\n", " 'database': \"f6operation_data_relay\"\n", " }\n", - " table_name = \"new_dealer_service_order_to_bi\" # 要清空的表名\n", + " table_name = \"non_standard_performance_to_BI\" # 要清空的表名\n", "\n", " connection = None\n", " try:\n", @@ -273,35 +288,44 @@ " # df.to_csv(os.path.join(output_dir, \"new_dealer_service_order_to_bi.csv\"))\n", "\n", " # step3:数据库删除\n", - " # self.clear_table_data()\n", + " self.clear_table_data()\n", "\n", " # step4:数据写入BI\n", - " # self.write_to_bi(df)\n", + " self.write_to_bi(df)\n", "\n", - " # common_module.send_task_status(task_start_time, \"非标业绩提报转BI\")\n", + " common_module.send_task_status(task_start_time, \"非标业绩提报转BI\")\n", "\n", "\n", "if __name__ == '__main__':\n", - " start = NewDealerServiceOrderToBI()\n", + " start = NonStandardPerformanceToBI()\n", " start.main()" ], "outputs": [ { - "ename": "ModuleNotFoundError", - "evalue": "No module named '_version'", - "output_type": "error", - "traceback": [ - "\u001B[31m---------------------------------------------------------------------------\u001B[39m", - "\u001B[31mModuleNotFoundError\u001B[39m Traceback (most recent call last)", - "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[6]\u001B[39m\u001B[32m, line 7\u001B[39m\n\u001B[32m 5\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mapi\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m API\n\u001B[32m 6\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mpymysql\u001B[39;00m \u001B[38;5;66;03m# 使用 pymysql 替代 mysql.connector\u001B[39;00m\n\u001B[32m----> \u001B[39m\u001B[32m7\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mback_ground_module\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m CommonModule\n\u001B[32m 8\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mos\u001B[39;00m\n\u001B[32m 9\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mmysql\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mconnector\u001B[39;00m\n", - "\u001B[36mFile \u001B[39m\u001B[32mD:\\Idea Project\\SaaS_V1.6\\back_ground_module\\__init__.py:17\u001B[39m\n\u001B[32m 15\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mback_ground_module\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mrevisit_all_information\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m RevisitAllInformation\n\u001B[32m 16\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mback_ground_module\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01myida_Fpo_Jandaoyun\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m YDFpoJiandaoyun\n\u001B[32m---> \u001B[39m\u001B[32m17\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mback_ground_module\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mget_process_time\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m TimeConsumingProcess\n\u001B[32m 18\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mback_ground_module\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mupdate_BI_CRM_info\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m CRMDataProcessor\n\u001B[32m 19\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mback_ground_module\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mupdate_ID_form\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m update_ID_form\n", - "\u001B[36mFile \u001B[39m\u001B[32mD:\\Idea Project\\SaaS_V1.6\\back_ground_module\\get_process_time.py:7\u001B[39m\n\u001B[32m 5\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mdatetime\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m datetime, timedelta\n\u001B[32m 6\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mpandas\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mas\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mpd\u001B[39;00m\n\u001B[32m----> \u001B[39m\u001B[32m7\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mmysql\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mconnector\u001B[39;00m\n\u001B[32m 8\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mmysql\u001B[39;00m\u001B[34;01m.\u001B[39;00m\u001B[34;01mconnector\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m Error\n\u001B[32m 9\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mjson\u001B[39;00m\n", - "\u001B[36mFile \u001B[39m\u001B[32mD:\\ProgramTools\\anaconda3\\envs\\jdy\\Lib\\site-packages\\mysql\\connector\\__init__.py:34\u001B[39m\n\u001B[32m 31\u001B[39m paramstyle = \u001B[33m'\u001B[39m\u001B[33mpyformat\u001B[39m\u001B[33m'\u001B[39m\n\u001B[32m 33\u001B[39m \u001B[38;5;66;03m# Read the version from an generated file\u001B[39;00m\n\u001B[32m---> \u001B[39m\u001B[32m34\u001B[39m \u001B[38;5;28;01mimport\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01m_version\u001B[39;00m\n\u001B[32m 35\u001B[39m __version__ = _version.version\n\u001B[32m 37\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mconnection\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m MySQLConnection\n", - "\u001B[31mModuleNotFoundError\u001B[39m: No module named '_version'" + "name": "stderr", + "output_type": "stream", + "text": [ + "\u001B[92m2025-08-20 16:58:28,211 - api.py - task_logger - INFO - 已获取 8 条数据\u001B[0m\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "成功清空表 non_standard_performance_to_BI 中的所有数据\n", + "数据库连接已关闭\n", + "成功写入 8 条记录到 non_standard_performance_to_BI 表中。\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\u001B[92m2025-08-20 16:58:29,045 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-20T08:58:26.821Z', 'updateTime': '2025-08-20T08:58:26.821Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-20T00:00:00.000Z', '_widget_1743644977694': '非标业绩提报转BI', '_widget_1744873387501': '2025-08-20T08:58:28.000Z', '_widget_1744873387502': '2025-08-20T08:58:28.000Z', '_widget_1744873387504': '0', '_id': '68a58e326435007d9a859fa2', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}}\u001B[0m\n" ] } ], - "execution_count": 6 + "execution_count": 2 }, { "metadata": { @@ -452,8 +476,8 @@ " lambda x: (\n", " self.process_list_field(x, field_config)\n", " if (isinstance(x, np.ndarray) and x.size > 0) # 非空 NumPy 数组\n", - " or (isinstance(x, list) and len(x) > 0) # 非空列表\n", - " or (not isinstance(x, (np.ndarray, list)) and x is not None and not pd.isna(x)) # 其他非空值\n", + " or (isinstance(x, list) and len(x) > 0) # 非空列表\n", + " or (not isinstance(x, (np.ndarray, list)) and x is not None and not pd.isna(x)) # 其他非空值\n", " else None # 空数组、空列表、None、NaN 都返回 None\n", " )\n", " )\n",