省市区人员关系表同步到bi
This commit is contained in:
@@ -27,3 +27,4 @@ from back_ground_module.new_dealer_service_order_to_bi import NewDealerServiceOr
|
||||
from back_ground_module.non_standar_performance_to_BI import NonStandardPerformanceToBI
|
||||
from back_ground_module.partner_settlement_to_BI import PartnerSettlementToBI
|
||||
from back_ground_module.GD_match_phone_number import GDMatchPhoneNumber
|
||||
from back_ground_module.province_city_person_relation_to_bi import ProvinceCityPersonRelationToBI
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
import pandas as pd
|
||||
import datetime
|
||||
from config import Config
|
||||
from api import API
|
||||
import pymysql # 使用 pymysql 替代 mysql.connector
|
||||
from back_ground_module import CommonModule
|
||||
import os
|
||||
import mysql.connector
|
||||
import pandas as pd
|
||||
import json
|
||||
import numpy as np
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from log_config import configure_task_logger, configure_error_task_logger
|
||||
import math
|
||||
|
||||
logger = configure_task_logger()
|
||||
error_task_logger = configure_error_task_logger()
|
||||
output_dir = "output" # 设置输出目录
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
common_module = CommonModule()
|
||||
api_instance = API()
|
||||
|
||||
|
||||
class ProvinceCityPersonRelationToBI:
|
||||
def __init__(self):
|
||||
self.pvc_data = None
|
||||
self.field_mapping = {
|
||||
"省": "_widget_1734677164861",
|
||||
"市": "_widget_1734677164862",
|
||||
"区": "_widget_1734677164863",
|
||||
"运营顾问": "_widget_1734677164864",
|
||||
"区域经理": "_widget_1734677164865",
|
||||
"运营专家": "_widget_1734677164866",
|
||||
"战区": "_widget_1734677164867",
|
||||
"新签回访客服": "_widget_1734677164868",
|
||||
"续约回访客服": "_widget_1734677164869",
|
||||
"异常待办客服": "_widget_1734677164870",
|
||||
"日常回访客服": "_widget_1734677164871",
|
||||
}
|
||||
|
||||
def load_all_data(self):
|
||||
payload = {"api_key": "675b900991ad2491c69389ca",
|
||||
"entry_id": "676512ac3e54dc3159460c0a",
|
||||
}
|
||||
pvc_data = api_instance.entry_data_list(payload)
|
||||
self.pvc_data = pvc_data.get("data") # api请求格式,将数据封装在data字典里
|
||||
|
||||
def data_process(self):
|
||||
df = pd.DataFrame(self.pvc_data)
|
||||
# 反转映射字典
|
||||
reverse_mapping = {v: k for k, v in self.field_mapping.items()}
|
||||
# 1.列明替换
|
||||
df.columns = [reverse_mapping.get(col, col) for col in df.columns]
|
||||
|
||||
# 2.成员字段取值
|
||||
user_columns = ["运营顾问", "区域经理", "运营专家", "新签回访客服", "续约回访客服",
|
||||
"异常待办客服", "日常回访客服"]
|
||||
|
||||
for col in user_columns:
|
||||
df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "")
|
||||
|
||||
return df
|
||||
|
||||
def clear_table_data(self):
|
||||
"""
|
||||
清空指定 MySQL 表的数据。
|
||||
参数已写死在函数内部,直接调用即可。
|
||||
"""
|
||||
# 数据库连接信息
|
||||
HS_DB_Config = {
|
||||
'host': "f6-public.rwlb.rds.aliyuncs.com",
|
||||
'user': "rw_operation_data_relay",
|
||||
'password': "m+q5Z4%IVuF9bf",
|
||||
'database': "f6operation_data_relay"
|
||||
}
|
||||
table_name = "province_city_person_relation_to_bi" # 要清空的表名
|
||||
|
||||
connection = None
|
||||
try:
|
||||
# 建立数据库连接
|
||||
connection = mysql.connector.connect(
|
||||
host=HS_DB_Config["host"],
|
||||
user=HS_DB_Config["user"],
|
||||
password=HS_DB_Config["password"],
|
||||
database=HS_DB_Config["database"]
|
||||
)
|
||||
if connection.is_connected():
|
||||
cursor = connection.cursor()
|
||||
|
||||
# 使用TRUNCATE清空表数据
|
||||
cursor.execute(f"TRUNCATE TABLE {table_name}")
|
||||
connection.commit()
|
||||
|
||||
logger.info(f"成功清空表 {table_name} 中的所有数据")
|
||||
|
||||
except Error as e:
|
||||
error_task_logger.error(f"清空表时发生错误: {e}")
|
||||
if connection and connection.is_connected():
|
||||
connection.rollback()
|
||||
finally:
|
||||
if connection and connection.is_connected():
|
||||
cursor.close()
|
||||
connection.close()
|
||||
logger.info("数据库连接已关闭")
|
||||
|
||||
def write_to_bi(self, df):
|
||||
HS_DB_Config = Config.HS_DB_Config
|
||||
table_name = "province_city_person_relation_to_bi"
|
||||
chunk_size = 1000 # 每批插入 1000 行
|
||||
|
||||
# 清理 DataFrame 中的 NaN/None 等值
|
||||
df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None)
|
||||
|
||||
connection = mysql.connector.connect(
|
||||
host=HS_DB_Config["host"],
|
||||
user=HS_DB_Config["user"],
|
||||
password=HS_DB_Config["password"],
|
||||
database=HS_DB_Config["database"]
|
||||
)
|
||||
cursor = connection.cursor()
|
||||
|
||||
try:
|
||||
# 获取数据库表的列名
|
||||
cursor.execute(f"SHOW COLUMNS FROM `{table_name}`")
|
||||
db_columns = [col[0] for col in cursor.fetchall()]
|
||||
|
||||
# 保留与数据库匹配的列
|
||||
filtered_df = df[df.columns.intersection(db_columns)]
|
||||
if filtered_df.empty:
|
||||
print("DataFrame 中没有与数据库表结构匹配的列。")
|
||||
return
|
||||
|
||||
# 处理 dict/list 类型字段:转为 JSON 字符串
|
||||
filtered_df = filtered_df.copy()
|
||||
for col in filtered_df.columns:
|
||||
if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any():
|
||||
filtered_df[col] = filtered_df[col].apply(
|
||||
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x
|
||||
)
|
||||
|
||||
# 构建 INSERT 语句(只构建一次)
|
||||
columns = [f"`{col}`" for col in filtered_df.columns]
|
||||
placeholders = ', '.join(['%s'] * len(columns))
|
||||
insert_sql = f"INSERT INTO `{table_name}` ({', '.join(columns)}) VALUES ({placeholders})"
|
||||
|
||||
total_rows = len(filtered_df)
|
||||
num_chunks = math.ceil(total_rows / chunk_size)
|
||||
|
||||
for i in range(num_chunks):
|
||||
start_idx = i * chunk_size
|
||||
end_idx = min(start_idx + chunk_size, total_rows)
|
||||
chunk_df = filtered_df.iloc[start_idx:end_idx]
|
||||
|
||||
# 转为元组列表
|
||||
data_to_insert = [
|
||||
tuple(row) for row in chunk_df.values
|
||||
]
|
||||
|
||||
# 批量执行(executemany 更高效)
|
||||
cursor.executemany(insert_sql, data_to_insert)
|
||||
|
||||
connection.commit()
|
||||
logger.info(f"成功写入 {total_rows} 条记录到 {table_name} 表中(分 {num_chunks} 批)。")
|
||||
|
||||
except Exception as e:
|
||||
error_task_logger.error(f"写入数据库时发生错误: {e}", exc_info=True)
|
||||
connection.rollback()
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
def main(self):
|
||||
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
try:
|
||||
logger.info("任务开始")
|
||||
# step1: 获取数据
|
||||
self.load_all_data()
|
||||
logger.info("加载数据完成")
|
||||
# step2:数据处理
|
||||
df = self.data_process()
|
||||
# df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv"))
|
||||
logger.info("数据处理完成")
|
||||
# step3:数据库删除
|
||||
self.clear_table_data()
|
||||
logger.info("目标数据库已清空")
|
||||
# step4:数据写入BI
|
||||
self.write_to_bi(df)
|
||||
logger.info("数据已写入数据库中")
|
||||
common_module.send_task_status(task_start_time, "省市区人员关系表转BI")
|
||||
except Exception as e:
|
||||
error_task_logger.error(f"省市区人员关系表转BI发生错误{e}")
|
||||
common_module.send_task_error(task_start_time, "省市区人员关系表转BI", str(e))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
province_city_person_relation_to_bi = ProvinceCityPersonRelationToBI()
|
||||
province_city_person_relation_to_bi.main()
|
||||
Reference in New Issue
Block a user