# -*- coding: utf-8 -*- import pandas as pd import datetime from config import Config from api import API import pymysql # 使用 pymysql 替代 mysql.connector from back_ground_module import CommonModule from tqdm import tqdm from log_config import configure_task_logger, configure_error_task_logger logger = configure_task_logger() error_task_logger = configure_error_task_logger() start_time = datetime.datetime.now() api_instance = API() common_module = CommonModule() class ImportPerformanceData: """ 履约表数据支撑 """ def __init__(self): self.staff_name_to_id = None self.staff_id_list = None self.performance_data_list = None self.field_mapping = {} self.fields() def load_all_data(self): """加载所有数据""" payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "68637c9818bc333fc14c30ad", # 需要修改 } performance_data = api_instance.entry_data_list(payload) self.performance_data_list = performance_data.get("data") # 履约表 # 获取简道云员工id payload = {"api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "6769204a1902c9341340a1bc", } staff_id = api_instance.entry_data_list(payload) self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里 # 预处理员工姓名到ID的映射 self.staff_name_to_id = { str(item["_widget_1734942794144"]): item["_widget_1734942794145"] for item in self.staff_id_list } def process_data(self, df): """处理数据的主函数""" new_df = self.convert_to_utc(df) all_data = [] # 预定义角色映射 role_mapping = { '运营负责人': '运营负责人', '区域经理': '区域经理' } # 使用iterrows的替代方案itertuples更快,但需要确保列名是有效的Python标识符 for row in tqdm(new_df.itertuples(index=False), total=len(new_df)): row_dict = row._asdict() # 成员字段替换 for role, field in role_mapping.items(): name = getattr(row, field, None) if name and str(name) in self.staff_name_to_id: row_dict[role] = self.staff_name_to_id[str(name)] else: row_dict[role] = None # 简道云字段替换 data_dict = self.row_to_dict(row_dict, self.field_mapping) all_data.append(data_dict) return all_data def convert_to_utc(self, df): # 创建副本避免修改原DataFrame new_df = df.copy() time_columns = ['saas开户时间', '服务期起始时间', '下单支付成功时间', '操作时间', "下单支付成功日期", "服务期结束时间"] for col in tqdm(time_columns): if col in tqdm(new_df.columns): # 安全检查列是否存在 try: # 1. 转换为datetime(自动推断格式,处理无效值为NaT) new_df[col] = pd.to_datetime(new_df[col], errors='coerce', utc=False) # 2. 时区转换(仅对有效日期操作) mask = new_df[col].notna() # 只处理非空值 if mask.any(): # 如果有有效日期才转换 # 本地化为北京时间,然后转换为UTC new_df.loc[mask, col + '_utc'] = ( new_df.loc[mask, col] .dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='shift_forward') .dt.tz_convert('UTC') .dt.strftime('%Y-%m-%dT%H:%M:%SZ') ) else: new_df[col + '_utc'] = pd.NA # 全部为空时保持一致性 except Exception as e: print(f"处理列 {col} 时出错: {str(e)}") new_df[col + '_utc'] = pd.NA # 出错时设为NA return new_df @staticmethod def row_to_dict(row, field_mapping): """将一行数据转换为指定格式的字典""" result = {} for col_name, widget_id in field_mapping.items(): if col_name in row: value = row[col_name] # 处理Timestamp类型 if pd.isna(value): clean_value = None elif isinstance(value, pd.Timestamp): clean_value = value.strftime('%Y-%m-%dT%H:%M:%SZ') else: clean_value = value result[widget_id] = {"value": clean_value} return result def fields(self): self.field_mapping = { '公司名称': '_widget_1751350424090', '门店名称': '_widget_1751350424083', '门店编码': '_widget_1751350424084', '运营负责人': '_widget_1751350424085', '区域经理': '_widget_1751350424086', 'saas开户时间': '_widget_1751350424088', '服务期起始时间': '_widget_1751350424097', '下单支付成功时间': '_widget_1751350424101', '操作时间': '_widget_1751350424110', '下单支付成功日期': '_widget_1751350424115', '服务期结束时间': '_widget_1751350424098', '订单id': '_widget_1751350424075', 'f6订单编号': '_widget_1751350424076', '宜搭的实例id': '_widget_1751350424077', '商品id': '_widget_1751350424078', '商品名称': '_widget_1751350424079', '发布商品类型': '_widget_1751350424080', '发布商品类型描述': '_widget_1751350424081', '门店id': '_widget_1751350424082', '商户中心id': '_widget_1751350424087', '公司id': '_widget_1751350424089', '产生来源': '_widget_1751350424091', '产生来源描述': '_widget_1751350424092', '类型': '_widget_1751350424093', '类型描述': '_widget_1751350424094', '服务年份': '_widget_1751350424095', '订单服务期第几年': '_widget_1751350424096', '提成业务类型': '_widget_1751350424099', '提成类别': '_widget_1751350424100', '实付金额': '_widget_1751881109632', '系统成本价': '_widget_1751881109633', '版本费': '_widget_1751881109634', '服务费': '_widget_1751881109635', '介绍人员工ID': '_widget_1751350424106', '介绍业绩归属人员工ID': '_widget_1751350424107', '处理人ID employee_id': '_widget_1751350424108', '业绩归属人员工ID': '_widget_1751350424109', '处理人是否跟进,0: 未跟进,1: 已跟进': '_widget_1751350424111', '满意度评分': '_widget_1751350424112', '评价完成时间': '_widget_1751350424113', '介绍人用户类型': '_widget_1751350424114', '培训完成时间': '_widget_1751350424116', '订单所处阶段': '_widget_1751350424117', '日分区': '_widget_1751350424118', } def main(self): task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: self.load_all_data() # Step1:获取履约表数据 df = common_module.get_perforamnce_details() logger.info("数据获取完成") # Step2:清空现有数据 try: id_list = [item["_id"] for item in self.performance_data_list] delete_payload = { "api_key": "675b900991ad2491c69389ca", "entry_id": "68637c9818bc333fc14c30ad", "data_ids": id_list } api_instance.entry_data_batch_delete(delete_payload) logger.info("数据删除完成") except Exception as e: error_task_logger.error(f"数据删除失败: {e}") common_module.send_task_error(task_start_time, "履约表数据支撑", str(e)) # Step3:将数据写入简道云中 all_data = self.process_data(df) # 分批处理,每批1000条 batch_size = 1000 for i in tqdm(range(0, len(all_data), batch_size)): batch = all_data[i:i + batch_size] payload = { "api_key": "675b900991ad2491c69389ca", "entry_id": "68637c9818bc333fc14c30ad", "data_list": batch } api_instance.entry_data_batch_create(payload) logger.info("简道云数据写入完成") common_module.send_task_status(task_start_time, "履约表数据支撑") except Exception as e: error_task_logger.error(f"履约表数据支撑执行失败: {e}") common_module.send_task_error(task_start_time, "履约表数据支撑", str(e)) if __name__ == '__main__': start = ImportPerformanceData() start.main()