# -*- coding: utf-8 -*- import pandas as pd import datetime from config import Config from api import API import pymysql # 使用 pymysql 替代 mysql.connector from back_ground_module import CommonModule import os import mysql.connector import pandas as pd import json import numpy as np import mysql.connector from mysql.connector import Error from log_config import configure_task_logger, configure_error_task_logger logger = configure_task_logger() error_task_logger = configure_error_task_logger() api_instance = API() common_module = CommonModule() output_dir = "output" # 设置输出目录 os.makedirs(output_dir, exist_ok=True) class NonStandardPerformanceToBI: """ 非标业绩提报转BI""" def __init__(self): self.dealer_service_data = None self.field_mapping = { "报备类型": "_widget_1753770875899", "协作内容": "_widget_1753770875915", "情况说明": "_widget_1753770875944", "订单编号": "_widget_1753770875887", "实付金额": "_widget_1753770875889", "门店编码": "_widget_1753770875890", "门店名称": "_widget_1753770875888", "版本": "_widget_1753770875891", "年限": "_widget_1753948745953", "支付日期": "_widget_1753770875893", "开户/处理日期": "_widget_1753770875894", "小六业绩金额": "_widget_1753770875898", "区域业绩金额": "_widget_1753770875937", "报备业绩归属区域经理": "_widget_1753770875903", "报备业绩归属大区": "_widget_1753866196486", "原业绩归属人": "_widget_1753856032683", "原业绩归属区域经理": "_widget_1753866196485", "小六业绩比例": "_widget_1753770875917", "区域业绩比例": "_widget_1753770875921", "运营专家": "_widget_1753770875902", "提成类型": "_widget_1753778922504", "SaaS新签提成比例": "_widget_1753770875949", "服务包提成比例": "_widget_1753778922567", "提成金额": "_widget_1753770875948", "新签提成比例-首年": "_widget_1753778922503", "新签提成比例-非首年": "_widget_1753778922548", "新签阶段及提成比例": "_widget_1753778656359", "业绩动作": "_widget_1756708722933", "提成动作": "_widget_1756708722932", "新签阶段及提成比例.选择提成阶段": "_widget_1753778656359._widget_1753778656361", "新签阶段及提成比例.新签阶段": "_widget_1753778656359._widget_1753948745962", "新签阶段及提成比例.提成比例": "_widget_1753778656359._widget_1753778656362", "业绩类型": "_widget_1753770875966", "报备业绩归属小六": "_widget_1753770875901", "原业绩归属大区": "_widget_1755159216098", "业绩分类": "_widget_1758706882564", "流程是否结束": "_widget_1761633418013", "业绩类型-聚合": "_widget_1758706882564", "业绩分组": "_widget_1762417447169", "商品名称": "_widget_1762219744898", "履约金额": "_widget_1762220516367", "业绩归属日期": "_widget_1762417447127", "公司名称": "_widget_1762420723743", "公司ID": "_widget_1762420723744", "报备业绩金额-区域提交": "_widget_1766375035236", "业绩归属小六-区域提交": "_widget_1766461143813", "业绩归属月": "_widget_1766375035265", "是否同步衡石": "_widget_1766484337844", "提交人": "creator", "提交时间": "createTime", "更新时间": "updateTime" } # 定义需要特殊处理的列表字段及其内部字段映射 self.list_fields_config = { "新签阶段及提成比例": { "_widget_1753778656361": "选择提成阶段", "_widget_1753948745962": "新签阶段", "_widget_1753778656362": "提成比例" }, # 可以在这里添加其他列表字段的配置 # "另一个列表字段": { # "原始字段名1": "映射后字段名1", # "原始字段名2": "映射后字段名2" # } } def load_all_data(self): # 获取非标业绩提报数据 payload = {"api_key": "66b9678280b37f8a276b1d01", "entry_id": "68886b7c0382a7249ae0b5d6", } dealer_service = api_instance.entry_data_list(payload) self.dealer_service_data = dealer_service.get("data") # api请求格式,将数据封装在data字典里 def process_list_field(self, field_value, field_config): """通用方法:处理列表类型的字段""" if not isinstance(field_value, (list, np.ndarray)): return field_value processed_list = [] for item in field_value: if not isinstance(item, dict): processed_list.append(item) continue processed_item = {} for original_key, mapped_key in field_config.items(): if original_key in item: # 处理包含id的字典字段 if isinstance(item[original_key], dict) and "id" in item[original_key]: processed_item[mapped_key] = item[original_key]["id"] else: processed_item[mapped_key] = item[original_key] else: processed_item[mapped_key] = None processed_list.append(processed_item) return processed_list def data_process(self): df = pd.DataFrame(self.dealer_service_data) # 反转映射字典 reverse_mapping = {v: k for k, v in self.field_mapping.items()} # 1.列明替换 df.columns = [reverse_mapping.get(col, col) for col in df.columns] # 只保留流程是否结束为是的内容 target_col = "流程是否结束" if target_col in df.columns: # 只有当列存在时才进行过滤,且 pandas 会自动处理 NaN != "是" 的情况 df = df[df[target_col] == "是"] else: logger.warning(f"字段 '{target_col}' 不存在,跳过过滤步骤,保留所有数据或根据业务需求处理。") if df.empty: logger.info("过滤后数据为空,无需后续处理。") return df # 2.成员字段取值 user_columns = ["报备业绩归属小六", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家", "业绩归属小六-区域提交"] for col in user_columns: df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") # 3.日期字段转为北京时间 time_columns = ["支付日期", "开户/处理日期", "提交时间", "更新时间", "业绩归属月", "业绩归属日期"] for col in time_columns: # 1. 解析为 datetime,并明确指定为 UTC(即使原始字符串无时区) dt_utc = pd.to_datetime(df[col], errors='coerce', utc=True) # 2. 转换为北京时间 dt_beijing = dt_utc.dt.tz_convert('Asia/Shanghai') # 3. 去掉时区信息(变成 naive datetime),然后格式化为字符串 df[col] = dt_beijing.dt.tz_localize(None).dt.strftime('%Y-%m-%d %H:%M:%S') # 4.业绩动作等于拆单做复制 # 4.1. 定义条件 mask = df['业绩动作'] == '拆单' # 4.2. 复制满足条件的行 new_rows = df[mask].copy() # ⚠️ 一定要用 .copy() 避免 SettingWithCopyWarning # 3. 修改新行中的某些列 new_rows['小六业绩金额'] = -new_rows['小六业绩金额'] new_rows['区域业绩金额'] = -new_rows['区域业绩金额'] new_rows['报备业绩归属小六'] = new_rows['原业绩归属人'] new_rows['报备业绩归属区域经理'] = new_rows['原业绩归属区域经理'] new_rows['报备业绩归属大区'] = new_rows['原业绩归属大区'] # 4. 合并回原 DataFrame df = pd.concat([df, new_rows], ignore_index=True) # 5.处理所有配置的列表字段 if "新签阶段及提成比例" in df.columns: # 先处理订单登记表字段 df["新签阶段及提成比例"] = df["新签阶段及提成比例"].apply( lambda x: self.process_list_field(x, self.list_fields_config["新签阶段及提成比例"]) if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x)) else None ) # 拆分行 df_exploded = df.explode("新签阶段及提成比例") # 将订单登记表中的字段提取到主表中 order_fields = self.list_fields_config["新签阶段及提成比例"].values() for field in order_fields: df_exploded[field] = df_exploded["新签阶段及提成比例"].apply( lambda x: x.get(field) if isinstance(x, dict) else None ) # 删除原始的订单登记表列 df_exploded = df_exploded.drop(columns=["新签阶段及提成比例"]) # 重置索引 df = df_exploded.reset_index(drop=True) return df def write_to_bi(self, df): # 数据库连接信息 HS_DB_Config = Config.HS_DB_Config table_name = "non_standard_performance_to_BI" # 替换为你的实际表名 # 建立数据库连接 connection = mysql.connector.connect( host=HS_DB_Config["host"], user=HS_DB_Config["user"], password=HS_DB_Config["password"], database=HS_DB_Config["database"] ) cursor = connection.cursor() try: # 查询表列名 cursor.execute(f"SHOW COLUMNS FROM {table_name}") columns_info = cursor.fetchall() db_columns = [col[0] for col in columns_info] # 提取列名 df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None) # 保留 DataFrame 中与数据库列名匹配的列 filtered_df = df[df.columns.intersection(db_columns)] # 如果没有匹配的列,直接返回 if filtered_df.empty: print("DataFrame 中没有与数据库表结构匹配的列。") return # 筛选列之后,插入前处理 dict 类型 filtered_df = filtered_df.copy() for col in filtered_df.columns: if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any(): filtered_df.loc[:, col] = filtered_df[col].apply( lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x ) # 构建插入语句 placeholders = ', '.join(['%s'] * len(filtered_df.columns)) # 使用反引号避免特殊列明 columns = ', '.join([f"`{col}`" for col in filtered_df.columns]) insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 将 DataFrame 写入数据库 for _, row in filtered_df.iterrows(): cursor.execute(insert_sql, tuple(row)) connection.commit() logger.info(f"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。") except Exception as e: error_task_logger.error(f"写入数据库时发生错误: {e}") connection.rollback() finally: cursor.close() connection.close() def clear_table_data(self): """ 清空指定 MySQL 表的数据。 参数已写死在函数内部,直接调用即可。 """ # 数据库连接信息 HS_DB_Config = { 'host': "f6-public.rwlb.rds.aliyuncs.com", 'user': "rw_operation_data_relay", 'password': "m+q5Z4%IVuF9bf", 'database': "f6operation_data_relay" } table_name = "non_standard_performance_to_BI" # 要清空的表名 connection = None try: # 建立数据库连接 connection = mysql.connector.connect( host=HS_DB_Config["host"], user=HS_DB_Config["user"], password=HS_DB_Config["password"], database=HS_DB_Config["database"] ) if connection.is_connected(): cursor = connection.cursor() # 使用TRUNCATE清空表数据 cursor.execute(f"TRUNCATE TABLE {table_name}") connection.commit() logger.info(f"成功清空表 {table_name} 中的所有数据") except Error as e: error_task_logger.error(f"清空表时发生错误: {e}") if connection and connection.is_connected(): connection.rollback() finally: if connection and connection.is_connected(): cursor.close() connection.close() logger.info("数据库连接已关闭") def main(self): task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: logger.info("任务开始") # step1: 获取数据 self.load_all_data() logger.info("加载数据完成") # step2:数据处理 df = self.data_process() # df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv")) logger.info("数据处理完成") # step3:数据库删除 self.clear_table_data() logger.info("目标数据库已清空") # step4:数据写入BI self.write_to_bi(df) logger.info("数据已写入数据库中") common_module.send_task_status(task_start_time, "非标业绩提报转BI") except Exception as e: error_task_logger.error(f"非标业绩提报转BI发生错误{e}") common_module.send_task_error(task_start_time, "非标业绩提报转BI", str(e)) if __name__ == '__main__': start = NonStandardPerformanceToBI() start.main()