This commit is contained in:
z66
2025-08-12 13:43:10 +08:00
commit d5e60e9014
122 changed files with 171837 additions and 0 deletions
+375
View File
@@ -0,0 +1,375 @@
import pandas as pd
import mysql.connector
from mysql.connector import Error
from api import API
import ast
from datetime import datetime, timedelta
from dateutil.parser import parse
from back_ground_module import CommonModule
import numpy as np
from config import Config
common_module = CommonModule()
class CRMDataProcessor:
"""泰国CRM数据迁移到BI"""
def __init__(self):
"""
初始化CRM数据处理类
"""
api_key = Config.OVERSEAS_PROJECT_CRM_APP_ID
entry_id = Config.OVERSEAS_PROJECT_FOLLOW_UP_PLAN_ENTRY_ID
self.api_key = api_key
self.entry_id = entry_id
self.db_config = Config.HS_DB_Config
self.api_instance = API()
self.connection = None
self.cursor = None
# 字段映射字典
self.id_to_name_mapping = {
'_widget_0201002000001': '客户编码',
'_widget_0201002000002': '门店名称',
'_widget_1630862543434': '客户级别',
'_widget_1630862543415': 'BD状态',
'_widget_1631071964761': '客户来源',
'_widget_0201002000005': '所属公海',
'_widget_1663742427957': '客户查重',
'_widget_1747123933309': '客户BD日期',
'_widget_1747661411161': 'Test开始日期',
'_widget_1747661411178': 'Test天数',
'_widget_1747664949166': 'Test账号状态',
'_widget_1747123933310': '客户签约日期',
'_widget_1747123933311': '客户付款日期',
'_widget_0201002000003': 'BD负责人',
'_widget_0201002000006': '销售归属部门',
'_widget_0201002000004': 'BD协作人',
'_widget_1747133970117': 'BD阻塞原因',
'_widget_1747669642443': 'BD阻塞原因分类',
'_widget_1747133970121': 'BD阻塞原因详细描述',
'_widget_1746675108119': '原系统名称',
'_widget_1746675108151': '门店设备',
'_widget_1630862543637': '门店类型',
'_widget_1746675108121': '门店业务',
'_widget_1746675108115': '工位数',
'_widget_1747321321284': '员工数',
'_widget_1746675108123': '系统操作人员',
'_widget_1747123507416': '开户门店ID',
'_widget_1737092013289': '公司名称',
'_widget_1737092013291': '主店简称',
'_widget_1737092013292': '营业执照号',
'_widget_1737092013293': '营业执照名称',
'_widget_1737097231480': '',
'_widget_1737097231482': '',
'_widget_1737092013299': '地址_省市区',
'_widget_1737097782393': '详情地址',
'_widget_1737092013314': '营业开始时间',
'_widget_1737092013313': '营业结束时间',
'_widget_1737092013312': '经度',
'_widget_1737092013311': '纬度',
'_widget_1737092013310': '负责人名字',
'_widget_1737092013308': '负责人手机号',
'_widget_1737092013309': '服务接待固话',
'_widget_1737092013315': '服务接待名称',
'_widget_1737092013316': '服务接待手机号',
'_widget_1737092013320': '账号数上限',
'_widget_1737092013318': '税号',
'_widget_1737092013317': '分支编号',
'_widget_1737092013319': '嘉实多内部门店编码',
'_widget_1737092013339': '运营顾问_上线负责人',
'_widget_1747123933312': '客户上线日期',
'_widget_1740460160201': '上线阶段',
'_widget_1747133970122': '上线失败原因',
'_widget_1631071964934': '上线失败原因描述',
'_widget_1749108058561': '门店系统使用情况说明',
'_widget_1737092013340': '运营专家',
'_widget_1737092013341': '区域经理',
'_widget_1663742427909': '客户查重_客户表内该客户数量',
'_widget_0201002000007': '领取时间',
'_widget_0201002000009': '客户退回原因',
'_widget_0201002000008': '最后跟进时间',
'plan_recycle_time': "预计回收时间",
'_widget_1709110880998': '结算期限',
'_widget_1709110881001': '信用额度_元',
'_widget_1709110881002': '发票抬头',
'_widget_1709110881003': '发票税号',
'_widget_1709110881004': '税种',
'_widget_1709110881007': '增值税税率_',
'_widget_1709110881008': '开户电话',
'_widget_1709110881009': '开户银行',
'_widget_1709110881016': '银行账户',
"_widget_1749204198412": "原系统情况描述",
"_widget_1749632087678": "付款金额_泰铢",
'creator': '提交人',
'createTime': '提交时间',
'updateTime': '更新时间'
}
# 需要处理的列
self.list_columns_to_process = [
'_widget_1746675108123',
'_widget_1746675108121',
'_widget_1746675108151'
]
self.columns_to_process = [
'_widget_0201002000005',
'_widget_0201002000003',
'_widget_0201002000006',
'_widget_0201002000004',
'_widget_1737092013339',
'_widget_1737092013340',
'_widget_1737092013341'
]
self.columns_to_drop = [
'_widget_1737097782383',
'_widget_1737092013321',
'_widget_1737092013300',
'_widget_1737092013301',
'_widget_1737092013302'
]
self.datetime_columns = [
'_widget_1747123933309', # 客户BD日期
'_widget_1747123933310', # 客户签约日期
'_widget_1747123933311', # 客户付款日期
'_widget_0201002000007', # 领取时间
'_widget_0201002000008' # 最后跟进时间
]
def connect_db(self):
"""连接数据库"""
try:
self.connection = mysql.connector.connect(
host=self.db_config['host'],
user=self.db_config['user'],
password=self.db_config['password'],
database=self.db_config['database']
)
self.cursor = self.connection.cursor()
print(f"成功连接数据库 {self.db_config['database']}")
except Error as e:
print(f"连接数据库时发生错误: {e}")
raise
def close_db(self):
"""关闭数据库连接"""
if self.connection and self.connection.is_connected():
if self.cursor:
self.cursor.close()
self.connection.close()
print("数据库连接已关闭")
def fetch_crm_data(self):
"""从API获取CRM数据"""
payload = {"api_key": self.api_key, "entry_id": self.entry_id}
CRM_data_list = self.api_instance.entry_data_list(payload).get("data")
CRM_data = pd.DataFrame(CRM_data_list)
print("成功从API获取CRM数据")
CRM_data.to_csv("CRM.csv")
return CRM_data
def process_data(self, df):
"""处理CRM数据"""
# 去掉前六列和后两列
df = df.iloc[:, 6:-2]
# del df["creator"]
# del df["createTime"]
# del df["updateTime"]
# del df["updater"]
# del df["deleter"]
# del df["deleteTime"]
# 生成URL
base_url = f"https://www.jiandaoyun.com/dashboard/app/{self.api_key}/form/{self.entry_id}/data/"
df['url'] = base_url + df['_id'].astype(str) + "/qr_link"
del df['_id']
# 处理列表格式列
for col in self.list_columns_to_process:
if col in df.columns:
df[col] = df[col].apply(self._join_list_items)
# 处理字典格式列
for col in self.columns_to_process:
if col in df.columns:
if col == '_widget_0201002000004':
df[col] = df[col].apply(self._extract_names_from_list)
else:
df[col] = df[col].apply(self._extract_name_from_dict)
# 删除不需要的列
for col in self.columns_to_drop:
if col in df.columns:
df.drop(columns=[col], inplace=True)
# 处理时间列
for col in self.datetime_columns:
if col in df.columns:
df[col] = df[col].apply(self._add_8_hours)
# 重命名列
df.rename(columns=self.id_to_name_mapping, inplace=True)
# 只保留映射后的列和URL字段
mapped_columns = list(self.id_to_name_mapping.values()) + ['url']
df = df[[col for col in mapped_columns if col in df.columns]]
#df.replace([np.nan, None, r'^\s*$'], "", regex=True, inplace=True)
# 修改替换空值的实现方式
df = df.fillna("") # 先替换NaN和None
df = df.replace(r'^\s*$', "", regex=True) # 再替换空字符串
print("数据处理完成")
df.to_csv('DF.csv', index=False)
return df
def _join_list_items(self, cell_value):
"""将列表中的内容用逗号分隔"""
if pd.api.types.is_list_like(cell_value):
return ', '.join(map(str, cell_value))
if pd.isna(cell_value) or cell_value == '':
return ""
try:
cell_list = ast.literal_eval(cell_value)
return ', '.join(cell_list)
except (ValueError, SyntaxError):
return str(cell_value)
def _extract_name_from_dict(self, cell_value):
"""提取字典中的'name'"""
if pd.isna(cell_value):
return ""
if isinstance(cell_value, str):
try:
cell_dict = ast.literal_eval(cell_value)
except (ValueError, SyntaxError):
return str(cell_value)
else:
cell_dict = cell_value
return cell_dict.get('name', '')
def _extract_names_from_list(self, cell_value):
"""提取列表中所有字典的'name'"""
if isinstance(cell_value, (pd.Series, np.ndarray)):
return cell_value.apply(self._extract_names_from_list)
if pd.isna(cell_value).any() if hasattr(cell_value, '__iter__') else pd.isna(cell_value):
return ""
try:
cell_list = ast.literal_eval(cell_value) if isinstance(cell_value, str) else cell_value
names = [item.get('name', '') for item in cell_list if isinstance(item, dict)]
return ', '.join(names)
except (ValueError, SyntaxError, TypeError):
return str(cell_value)
def _add_8_hours(self, dt_str):
"""为时间添加8小时"""
if pd.isna(dt_str) or dt_str == '':
return None
try:
dt = parse(dt_str)
dt += timedelta(hours=8)
return dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
except Exception as e:
print(f"Error processing datetime {dt_str}: {e}")
return dt_str
def clear_table(self, table_name):
"""清空数据库表"""
try:
self.connect_db()
self.cursor.execute(f"TRUNCATE TABLE {table_name}")
self.connection.commit()
print(f"成功清空表 {table_name} 中的所有数据")
except Error as e:
print(f"清空表时发生错误: {e}")
if self.connection and self.connection.is_connected():
self.connection.rollback()
raise
finally:
self.close_db()
def import_data(self, df, table_name):
try:
self.connect_db()
# 处理空值 - 将NaN/NaT/空字符串统一转为None
df = df.map(lambda x: None if pd.isna(x) or str(x).strip() == '' else x)
# # 确保所有空值(包括NaN、None、空字符串)转为None
df = df.replace([np.nan, None, r'^\s*$'], None, regex=True)
# 检查表结构是否匹配
self.cursor.execute(f"DESCRIBE {table_name}")
table_columns = [col[0] for col in self.cursor.fetchall()]
# 只保留表中存在的列
df = df[[col for col in df.columns if col in table_columns]]
# 生成插入语句
columns = ', '.join(df.columns)
placeholders = ', '.join(['%s'] * len(df.columns))
insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 批量插入数据
records = [tuple(row) for row in df.values]
self.cursor.executemany(insert_query, records)
self.connection.commit()
print(f"成功导入 {self.cursor.rowcount} 条记录到 {table_name}")
except Error as e:
print(f"导入数据时发生错误: {e}")
if self.connection and self.connection.is_connected():
self.connection.rollback()
raise
finally:
self.close_db()
# # 批量插入数据
# records = [tuple(None if pd.isna(x) else x for x in row) for row in df.values]
# self.cursor.executemany(insert_query, records)
# self.connection.commit()
# print(f"成功导入 {self.cursor.rowcount} 条记录到 {table_name} 表")
# except Error as e:
# print(f"导入数据时发生错误: {e}")
# raise
# finally:
# self.close_db()
def main(self):
"""运行完整的数据处理流程"""
table_name = "jiandaoyun_crm_customer_profile"
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# 获取数据
raw_data = self.fetch_crm_data()
# 处理数据
processed_data = self.process_data(raw_data)
# 清空表
self.clear_table(table_name)
# 导入数据
self.import_data(processed_data, table_name)
print("数据处理流程完成")
except Exception as e:
print(f"数据处理流程出错: {e}")
raise
common_module.send_task_status(task_start_time, "简道云海外项目CRM客户档案迁移BI")
# 使用示例
if __name__ == "__main__":
# 配置信息
# 创建处理器实例并运行流程
processor = CRMDataProcessor()
processor.main()