import pandas as pd import mysql.connector from mysql.connector import Error from api import API import ast from datetime import datetime, timedelta from dateutil.parser import parse from back_ground_module import CommonModule import numpy as np from config import Config from log_config import configure_task_logger, configure_error_task_logger common_module = CommonModule() # 获取已经配置好的常规日志记录器 logger = configure_task_logger() # 获取已经配置好的错误任务日志记录器 error_task_logger = configure_error_task_logger() class CRMDataProcessor: """泰国CRM数据迁移到BI""" def __init__(self): """ 初始化CRM数据处理类 """ api_key = Config.OVERSEAS_PROJECT_CRM_APP_ID entry_id = Config.OVERSEAS_PROJECT_FOLLOW_UP_PLAN_ENTRY_ID self.api_key = api_key self.entry_id = entry_id self.db_config = Config.HS_DB_Config self.api_instance = API() self.connection = None self.cursor = None # 字段映射字典 self.id_to_name_mapping = { '_widget_0201002000001': '客户编码', '_widget_0201002000002': '门店名称', '_widget_1630862543434': '客户级别', '_widget_1630862543415': 'BD状态', '_widget_1631071964761': '客户来源', '_widget_0201002000005': '所属公海', '_widget_1663742427957': '客户查重', '_widget_1747123933309': '客户BD日期', '_widget_1747661411161': 'Test开始日期', '_widget_1747661411178': 'Test天数', '_widget_1747664949166': 'Test账号状态', '_widget_1747123933310': '客户签约日期', '_widget_1747123933311': '客户付款日期', '_widget_0201002000003': 'BD负责人', '_widget_0201002000006': '销售归属部门', '_widget_0201002000004': 'BD协作人', '_widget_1747133970117': 'BD阻塞原因', '_widget_1747669642443': 'BD阻塞原因分类', '_widget_1747133970121': 'BD阻塞原因详细描述', '_widget_1746675108119': '原系统名称', '_widget_1746675108151': '门店设备', '_widget_1630862543637': '门店类型', '_widget_1746675108121': '门店业务', '_widget_1746675108115': '工位数', '_widget_1747321321284': '员工数', '_widget_1746675108123': '系统操作人员', '_widget_1747123507416': '开户门店ID', '_widget_1737092013289': '公司名称', '_widget_1737092013291': '主店简称', '_widget_1737092013292': '营业执照号', '_widget_1737092013293': '营业执照名称', '_widget_1737097231480': '府', '_widget_1737097231482': '县', '_widget_1737092013299': '地址_省市区', '_widget_1737097782393': '详情地址', '_widget_1737092013314': '营业开始时间', '_widget_1737092013313': '营业结束时间', '_widget_1737092013312': '经度', '_widget_1737092013311': '纬度', '_widget_1737092013310': '负责人名字', '_widget_1737092013308': '负责人手机号', '_widget_1737092013309': '服务接待固话', '_widget_1737092013315': '服务接待名称', '_widget_1737092013316': '服务接待手机号', '_widget_1737092013320': '账号数上限', '_widget_1737092013318': '税号', '_widget_1737092013317': '分支编号', '_widget_1737092013319': '嘉实多内部门店编码', '_widget_1737092013339': '运营顾问_上线负责人', '_widget_1747123933312': '客户上线日期', '_widget_1740460160201': '上线阶段', '_widget_1747133970122': '上线失败原因', '_widget_1631071964934': '上线失败原因描述', '_widget_1749108058561': '门店系统使用情况说明', '_widget_1737092013340': '运营专家', '_widget_1737092013341': '区域经理', '_widget_1663742427909': '客户查重_客户表内该客户数量', '_widget_0201002000007': '领取时间', '_widget_0201002000009': '客户退回原因', '_widget_0201002000008': '最后跟进时间', 'plan_recycle_time': "预计回收时间", '_widget_1709110880998': '结算期限', '_widget_1709110881001': '信用额度_元', '_widget_1709110881002': '发票抬头', '_widget_1709110881003': '发票税号', '_widget_1709110881004': '税种', '_widget_1709110881007': '增值税税率_', '_widget_1709110881008': '开户电话', '_widget_1709110881009': '开户银行', '_widget_1709110881016': '银行账户', "_widget_1749204198412": "原系统情况描述", "_widget_1749632087678": "付款金额_泰铢", "_widget_1756951762528": "Contract end date", "_widget_1756951762527": "Contract start date", 'creator': '提交人', 'createTime': '提交时间', 'updateTime': '更新时间' } # 需要处理的列 self.list_columns_to_process = [ '_widget_1746675108123', '_widget_1746675108121', '_widget_1746675108151' ] self.columns_to_process = [ '_widget_0201002000005', '_widget_0201002000003', '_widget_0201002000006', '_widget_0201002000004', '_widget_1737092013339', '_widget_1737092013340', '_widget_1737092013341' ] self.columns_to_drop = [ '_widget_1737097782383', '_widget_1737092013321', '_widget_1737092013300', '_widget_1737092013301', '_widget_1737092013302' ] self.datetime_columns = [ '_widget_1747123933309', # 客户BD日期 '_widget_1747123933310', # 客户签约日期 '_widget_1747123933311', # 客户付款日期 '_widget_0201002000007', # 领取时间 '_widget_0201002000008', # 最后跟进时间 '_widget_1756951762528', # Contract end date '_widget_1756951762527', # Contract start date ] def connect_db(self): """连接数据库""" try: self.connection = mysql.connector.connect( host=self.db_config['host'], user=self.db_config['user'], password=self.db_config['password'], database=self.db_config['database'] ) self.cursor = self.connection.cursor() print(f"成功连接数据库 {self.db_config['database']}") except Error as e: print(f"连接数据库时发生错误: {e}") raise def close_db(self): """关闭数据库连接""" if self.connection and self.connection.is_connected(): if self.cursor: self.cursor.close() self.connection.close() print("数据库连接已关闭") def fetch_crm_data(self): """从API获取CRM数据""" payload = {"api_key": self.api_key, "entry_id": self.entry_id} CRM_data_list = self.api_instance.entry_data_list(payload).get("data") CRM_data = pd.DataFrame(CRM_data_list) print("成功从API获取CRM数据") # CRM_data.to_csv("CRM.csv") return CRM_data def process_data(self, df): """处理CRM数据""" # 去掉前六列和后两列 df = df.iloc[:, 6:-2] # 生成URL base_url = f"https://www.jiandaoyun.com/dashboard/app/{self.api_key}/form/{self.entry_id}/data/" df['url'] = base_url + df['_id'].astype(str) + "/qr_link" del df['_id'] # 处理列表格式列 for col in self.list_columns_to_process: if col in df.columns: df[col] = df[col].apply(self._join_list_items) # 处理字典格式列 for col in self.columns_to_process: if col in df.columns: if col == '_widget_0201002000004': df[col] = df[col].apply(self._extract_names_from_list) else: df[col] = df[col].apply(self._extract_name_from_dict) # 删除不需要的列 for col in self.columns_to_drop: if col in df.columns: df.drop(columns=[col], inplace=True) # 处理时间列 for col in self.datetime_columns: if col in df.columns: df[col] = df[col].apply(self._add_8_hours) # 重命名列 df.rename(columns=self.id_to_name_mapping, inplace=True) # 只保留映射后的列和URL字段 mapped_columns = list(self.id_to_name_mapping.values()) + ['url'] df = df[[col for col in mapped_columns if col in df.columns]] # df.replace([np.nan, None, r'^\s*$'], "", regex=True, inplace=True) # 修改替换空值的实现方式 df = df.fillna("") # 先替换NaN和None df = df.replace(r'^\s*$', "", regex=True) # 再替换空字符串 print("数据处理完成") df.to_csv('DF.csv', index=False) return df def _join_list_items(self, cell_value): """将列表中的内容用逗号分隔""" if pd.api.types.is_list_like(cell_value): return ', '.join(map(str, cell_value)) if pd.isna(cell_value) or cell_value == '': return "" try: cell_list = ast.literal_eval(cell_value) return ', '.join(cell_list) except (ValueError, SyntaxError): return str(cell_value) def _extract_name_from_dict(self, cell_value): """提取字典中的'name'值""" if pd.isna(cell_value): return "" if isinstance(cell_value, str): try: cell_dict = ast.literal_eval(cell_value) except (ValueError, SyntaxError): return str(cell_value) else: cell_dict = cell_value return cell_dict.get('name', '') def _extract_names_from_list(self, cell_value): """提取列表中所有字典的'name'值""" if isinstance(cell_value, (pd.Series, np.ndarray)): return cell_value.apply(self._extract_names_from_list) if pd.isna(cell_value).any() if hasattr(cell_value, '__iter__') else pd.isna(cell_value): return "" try: cell_list = ast.literal_eval(cell_value) if isinstance(cell_value, str) else cell_value names = [item.get('name', '') for item in cell_list if isinstance(item, dict)] return ', '.join(names) except (ValueError, SyntaxError, TypeError): return str(cell_value) def _add_8_hours(self, dt_str): """为时间添加8小时""" if pd.isna(dt_str) or dt_str == '': return None try: dt = parse(dt_str) dt += timedelta(hours=8) return dt.strftime('%Y-%m-%dT%H:%M:%S.000Z') except Exception as e: print(f"Error processing datetime {dt_str}: {e}") return dt_str def clear_table(self, table_name): """清空数据库表""" try: self.connect_db() self.cursor.execute(f"TRUNCATE TABLE {table_name}") self.connection.commit() logger.info(f"成功清空表 {table_name} 中的所有数据") except Error as e: error_task_logger.error(f"清空表时发生错误: {e}") task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") common_module.send_task_error(task_start_time, "简道云海外项目CRM客户档案迁移BI", str(e)) if self.connection and self.connection.is_connected(): self.connection.rollback() raise finally: self.close_db() def import_data(self, df, table_name): try: self.connect_db() # 处理空值 - 将NaN/NaT/空字符串统一转为None df = df.map(lambda x: None if pd.isna(x) or str(x).strip() == '' else x) df = df.replace([np.nan, None, r'^\s*$'], None, regex=True) # 检查表结构是否匹配 self.cursor.execute(f"DESCRIBE {table_name}") table_columns = [col[0] for col in self.cursor.fetchall()] # 只保留表中存在的列 df = df[[col for col in df.columns if col in table_columns]] # 修改这里:为所有列名添加反引号 columns = ', '.join([f'`{col}`' for col in df.columns]) # 添加反引号 placeholders = ', '.join(['%s'] * len(df.columns)) insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 批量插入数据 records = [tuple(row) for row in df.values] self.cursor.executemany(insert_query, records) self.connection.commit() logger.info(f"成功导入 {self.cursor.rowcount} 条记录到 {table_name} 表") except Error as e: error_task_logger.error(f"导入数据时发生错误: {e}") task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") common_module.send_task_error(task_start_time, "简道云海外项目CRM客户档案迁移BI", str(e)) if self.connection and self.connection.is_connected(): self.connection.rollback() raise finally: self.close_db() def main(self): """运行完整的数据处理流程""" table_name = "jiandaoyun_crm_customer_profile" task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: logger.info(f"开始处理任务") # 获取数据 raw_data = self.fetch_crm_data() logger.info("数据获取完成") # 处理数据 processed_data = self.process_data(raw_data) logger.info("数据处理完成") # 清空表 self.clear_table(table_name) logger.info("表清空完成") # 导入数据 self.import_data(processed_data, table_name) logger.info("数据导入完成") logger.info("数据处理流程完成") except Exception as e: common_module.send_task_error(task_start_time, "简道云海外项目CRM客户档案迁移BI", str(e)) error_task_logger.error(f"任务简道云海外项目CRM客户档案迁移BI执行失败。") raise common_module.send_task_status(task_start_time, "简道云海外项目CRM客户档案迁移BI") # 使用示例 if __name__ == "__main__": # 配置信息 # 创建处理器实例并运行流程 processor = CRMDataProcessor() processor.main()