Files
saas/back_ground_module/update_BI_CRM_info.py
T

388 lines
15 KiB
Python

import pandas as pd
import mysql.connector
from mysql.connector import Error
from api import API
import ast
from datetime import datetime, timedelta
from dateutil.parser import parse
from back_ground_module import CommonModule
import numpy as np
from config import Config
from log_config import configure_task_logger, configure_error_task_logger
import os
import json
common_module = CommonModule()
# 获取已经配置好的常规日志记录器
logger = configure_task_logger()
# 获取已经配置好的错误任务日志记录器
error_task_logger = configure_error_task_logger()
# 设置输出目录
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
class CRMDataProcessor:
"""泰国CRM数据迁移到BI"""
def __init__(self):
"""
初始化CRM数据处理类
"""
api_key = Config.OVERSEAS_PROJECT_CRM_APP_ID
entry_id = Config.OVERSEAS_PROJECT_FOLLOW_UP_PLAN_ENTRY_ID
self.api_key = api_key
self.entry_id = entry_id
self.db_config = Config.HS_DB_Config
self.api_instance = API()
self.connection = None
self.cursor = None
# 字段映射字典
self.id_to_name_mapping = {
'_widget_0201002000001': '客户编码',
'_widget_0201002000002': '门店名称',
'_widget_1630862543434': '客户级别',
'_widget_1630862543415': 'BD状态',
'_widget_1631071964761': '客户来源',
'_widget_0201002000005': '所属公海',
'_widget_1663742427957': '客户查重',
'_widget_1747123933309': '客户BD日期',
'_widget_1747661411161': 'Test开始日期',
'_widget_1747661411178': 'Test天数',
'_widget_1747664949166': 'Test账号状态',
'_widget_1747123933310': '客户签约日期',
'_widget_1747123933311': '客户付款日期',
'_widget_0201002000003': 'BD负责人',
'_widget_0201002000006': '销售归属部门',
'_widget_0201002000004': 'BD协作人',
'_widget_1747133970117': 'BD阻塞原因',
'_widget_1747669642443': 'BD阻塞原因分类',
'_widget_1747133970121': 'BD阻塞原因详细描述',
'_widget_1746675108119': '原系统名称',
'_widget_1746675108151': '门店设备',
'_widget_1630862543637': '门店类型',
'_widget_1746675108121': '门店业务',
'_widget_1746675108115': '工位数',
'_widget_1747321321284': '员工数',
'_widget_1746675108123': '系统操作人员',
'_widget_1747123507416': '开户门店ID',
'_widget_1737092013289': '公司名称',
'_widget_1737092013291': '主店简称',
'_widget_1737092013292': '营业执照号',
'_widget_1737092013293': '营业执照名称',
'_widget_1737097231480': '',
'_widget_1737097231482': '',
'_widget_1737092013299': '地址_省市区',
'_widget_1737097782393': '详情地址',
'_widget_1737092013314': '营业开始时间',
'_widget_1737092013313': '营业结束时间',
'_widget_1737092013312': '经度',
'_widget_1737092013311': '纬度',
'_widget_1737092013310': '负责人名字',
'_widget_1737092013308': '负责人手机号',
'_widget_1737092013309': '服务接待固话',
'_widget_1737092013315': '服务接待名称',
'_widget_1737092013316': '服务接待手机号',
'_widget_1737092013320': '账号数上限',
'_widget_1737092013318': '税号',
'_widget_1737092013317': '分支编号',
'_widget_1737092013319': '嘉实多内部门店编码',
'_widget_1737092013339': '运营顾问_上线负责人',
'_widget_1747123933312': '客户上线日期',
'_widget_1740460160201': '上线阶段',
'_widget_1747133970122': '上线失败原因',
'_widget_1631071964934': '上线失败原因描述',
'_widget_1749108058561': '门店系统使用情况说明',
'_widget_1737092013340': '运营专家',
'_widget_1737092013341': '区域经理',
'_widget_1663742427909': '客户查重_客户表内该客户数量',
'_widget_0201002000007': '领取时间',
'_widget_0201002000009': '客户退回原因',
'_widget_0201002000008': '最后跟进时间',
'plan_recycle_time': "预计回收时间",
'_widget_1709110880998': '结算期限',
'_widget_1709110881001': '信用额度_元',
'_widget_1709110881002': '发票抬头',
'_widget_1709110881003': '发票税号',
'_widget_1709110881004': '税种',
'_widget_1709110881007': '增值税税率_',
'_widget_1709110881008': '开户电话',
'_widget_1709110881009': '开户银行',
'_widget_1709110881016': '银行账户',
"_widget_1749204198412": "原系统情况描述",
"_widget_1749632087678": "付款金额_泰铢",
"_widget_1756951762528": "Contract end date",
"_widget_1756951762527": "Contract start date",
'creator': '提交人',
'createTime': '提交时间',
'updateTime': '更新时间'
}
# 需要处理的列
self.list_columns_to_process = [
'_widget_1746675108123',
'_widget_1746675108121',
'_widget_1746675108151'
]
self.columns_to_process = [
'_widget_0201002000005',
'_widget_0201002000003',
'_widget_0201002000006',
'_widget_0201002000004',
'_widget_1737092013339',
'_widget_1737092013340',
'_widget_1737092013341'
]
self.columns_to_drop = [
'_widget_1737097782383',
'_widget_1737092013321',
'_widget_1737092013300',
'_widget_1737092013301',
'_widget_1737092013302'
]
self.datetime_columns = [
'_widget_1747123933309', # 客户BD日期
'_widget_1747123933310', # 客户签约日期
'_widget_1747123933311', # 客户付款日期
'_widget_0201002000007', # 领取时间
'_widget_0201002000008', # 最后跟进时间
'_widget_1756951762528', # Contract end date
'_widget_1756951762527', # Contract start date
]
def connect_db(self):
"""连接数据库"""
try:
self.connection = mysql.connector.connect(
host=self.db_config['host'],
user=self.db_config['user'],
password=self.db_config['password'],
database=self.db_config['database']
)
self.cursor = self.connection.cursor()
print(f"成功连接数据库 {self.db_config['database']}")
except Error as e:
print(f"连接数据库时发生错误: {e}")
raise
def close_db(self):
"""关闭数据库连接"""
if self.connection and self.connection.is_connected():
if self.cursor:
self.cursor.close()
self.connection.close()
print("数据库连接已关闭")
def fetch_crm_data(self):
"""从API获取CRM数据"""
payload = {"api_key": self.api_key, "entry_id": self.entry_id}
CRM_data_list = self.api_instance.entry_data_list(payload).get("data")
CRM_data = pd.DataFrame(CRM_data_list)
print("成功从API获取CRM数据")
# CRM_data.to_csv("CRM.csv")
return CRM_data
def process_data(self, df):
"""处理CRM数据"""
# 保留第一列,去掉2-7列和后两列
# df.to_csv(os.path.join(output_dir, "CRM.csv"), index=False)
df = df.copy()
df = df.iloc[:, [0] + list(range(6, df.shape[1] - 2))] # shape【1】含义,df的列数,第二维度的大小shape(行,列)
# df.to_csv(os.path.join(output_dir, "CRM_processed.csv"), index=False)
# 生成URL
base_url = f"https://www.jiandaoyun.com/dashboard/app/{self.api_key}/form/{self.entry_id}/data/"
df['url'] = base_url + df['_id'].astype(str) + "/qr_link"
del df['_id']
# 处理列表格式列
for col in self.list_columns_to_process:
if col in df.columns:
df[col] = df[col].apply(self._join_list_items)
# 处理字典格式列
for col in self.columns_to_process:
if col in df.columns:
if col == '_widget_0201002000004':
df[col] = df[col].apply(self._extract_names_from_list)
else:
df[col] = df[col].apply(self._extract_name_from_dict)
# 删除不需要的列
for col in self.columns_to_drop:
if col in df.columns:
df.drop(columns=[col], inplace=True)
# 处理时间列
for col in self.datetime_columns:
if col in df.columns:
df[col] = df[col].apply(self._add_8_hours)
# 重命名列
df.rename(columns=self.id_to_name_mapping, inplace=True)
# 只保留映射后的列和URL字段
mapped_columns = list(self.id_to_name_mapping.values()) + ['url']
df = df[[col for col in mapped_columns if col in df.columns]]
# df.replace([np.nan, None, r'^\s*$'], "", regex=True, inplace=True)
# 修改替换空值的实现方式
df = df.fillna("") # 先替换NaN和None
df = df.replace(r'^\s*$', "", regex=True) # 再替换空字符串
print("数据处理完成")
df.to_csv('DF.csv', index=False)
return df
def _join_list_items(self, cell_value):
"""将列表中的内容用逗号分隔"""
if pd.api.types.is_list_like(cell_value):
return ', '.join(map(str, cell_value))
if pd.isna(cell_value) or cell_value == '':
return ""
try:
cell_list = ast.literal_eval(cell_value)
return ', '.join(cell_list)
except (ValueError, SyntaxError):
return str(cell_value)
def _extract_name_from_dict(self, cell_value):
"""提取字典中的'name'"""
if pd.isna(cell_value):
return ""
if isinstance(cell_value, str):
try:
cell_dict = ast.literal_eval(cell_value)
except (ValueError, SyntaxError):
return str(cell_value)
else:
cell_dict = cell_value
return cell_dict.get('name', '')
def _extract_names_from_list(self, cell_value):
"""提取列表中所有字典的'name'"""
if isinstance(cell_value, (pd.Series, np.ndarray)):
return cell_value.apply(self._extract_names_from_list)
if pd.isna(cell_value).any() if hasattr(cell_value, '__iter__') else pd.isna(cell_value):
return ""
try:
cell_list = ast.literal_eval(cell_value) if isinstance(cell_value, str) else cell_value
names = [item.get('name', '') for item in cell_list if isinstance(item, dict)]
return ', '.join(names)
except (ValueError, SyntaxError, TypeError):
return str(cell_value)
def _add_8_hours(self, dt_str):
"""为时间添加8小时"""
if pd.isna(dt_str) or dt_str == '':
return None
try:
dt = parse(dt_str)
dt += timedelta(hours=8)
return dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
except Exception as e:
print(f"Error processing datetime {dt_str}: {e}")
return dt_str
def clear_table(self, table_name):
"""清空数据库表"""
try:
self.connect_db()
self.cursor.execute(f"TRUNCATE TABLE {table_name}")
self.connection.commit()
logger.info(f"成功清空表 {table_name} 中的所有数据")
except Error as e:
error_task_logger.error(f"清空表时发生错误: {e}")
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
common_module.send_task_error(task_start_time, "简道云海外项目CRM客户档案迁移BI", str(e))
if self.connection and self.connection.is_connected():
self.connection.rollback()
raise
finally:
self.close_db()
def import_data(self, df, table_name):
# 不支持json的值
try:
self.connect_db()
# 处理空值 - 将NaN/NaT/空字符串统一转为None
df = df.map(lambda x: None if pd.isna(x) or str(x).strip() == '' else x)
df = df.replace([np.nan, None, r'^\s*$'], None, regex=True)
# 检查表结构是否匹配
self.cursor.execute(f"DESCRIBE {table_name}")
table_columns = [col[0] for col in self.cursor.fetchall()]
# 只保留表中存在的列
df = df[[col for col in df.columns if col in table_columns]]
# 修改这里:为所有列名添加反引号
columns = ', '.join([f'`{col}`' for col in df.columns]) # 添加反引号
placeholders = ', '.join(['%s'] * len(df.columns))
insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 批量插入数据
records = [tuple(row) for row in df.values]
self.cursor.executemany(insert_query, records)
self.connection.commit()
logger.info(f"成功导入 {self.cursor.rowcount} 条记录到 {table_name}")
except Error as e:
error_task_logger.error(f"导入数据时发生错误: {e}")
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
common_module.send_task_error(task_start_time, "简道云海外项目CRM客户档案迁移BI", str(e))
if self.connection and self.connection.is_connected():
self.connection.rollback()
raise
finally:
self.close_db()
def main(self):
"""运行完整的数据处理流程"""
table_name = "jiandaoyun_crm_customer_profile"
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info(f"开始处理任务")
# 获取数据
raw_data = self.fetch_crm_data()
logger.info("数据获取完成")
# 处理数据
processed_data = self.process_data(raw_data)
logger.info("数据处理完成")
# 清空表
self.clear_table(table_name)
logger.info("表清空完成")
# 导入数据
self.import_data(processed_data, table_name)
logger.info("数据导入完成")
logger.info("数据处理流程完成")
except Exception as e:
common_module.send_task_error(task_start_time, "简道云海外项目CRM客户档案迁移BI", str(e))
error_task_logger.error(f"任务简道云海外项目CRM客户档案迁移BI执行失败。")
raise
common_module.send_task_status(task_start_time, "简道云海外项目CRM客户档案迁移BI")
# 使用示例
if __name__ == "__main__":
# 配置信息
# 创建处理器实例并运行流程
processor = CRMDataProcessor()
processor.main()