Files
saas/test/经销商新签服务单转BI.py
T
2025-08-12 13:43:10 +08:00

212 lines
9.3 KiB
Python

# -*- coding: utf-8 -*-
import pandas as pd
import datetime
from config import Config
from api import API
import pymysql # 使用 pymysql 替代 mysql.connector
from back_ground_module import CommonModule
import os
import mysql.connector
import pandas as pd
import json
import numpy as np
import mysql.connector
from mysql.connector import Error
start_time = datetime.datetime.now()
api_instance = API()
common_module = CommonModule()
# 保存为CSV文件
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
os.makedirs(output_dir, exist_ok=True)
class NewDealerServiceOrderToBI:
def __init__(self):
self.dealer_service_data = None
self.field_mapping = {'购买的产品名称': '_widget_1742197585104', '经销商名称': '_widget_1741164213155',
'经销商简称': '_widget_1741164213151', '负责人姓名': '_widget_1741165503706',
'负责人手机号': '_widget_1741165503711', '经销商可使用的群数量': '_widget_1741165503710',
'订单编码': '_widget_1741164213149', '订单支付时间': '_widget_1741164213159',
'商户门店ID': '_widget_1741164213152', '开通时间': '_widget_1741164213171',
'详细地址': '_widget_1741164213172', '联系电话': '_widget_1741165503708',
'系统到期时间': '_widget_1741165503709', '开通状态': '_widget_1741165503714',
'销售负责人': '_widget_1741165503716', '运营顾问': '_widget_1741165503718',
'运营专家': '_widget_1741165503719', '区域经理': '_widget_1741165503717',
'业务人员': '_widget_1741165503721', '是否设置经营范围': '_widget_1742200372555',
'不设置经营范围原因': '_widget_1742268351775', '是否建群': '_widget_1742200372553',
'不建群原因': '_widget_1742268351776', '是否设置备货清单': '_widget_1742200372634',
'不设置备货清单原因': '_widget_1742268351778', '是否设置报价': '_widget_1742260928184',
'不设置报价原因': '_widget_1742268351777', '是否上货': '_widget_1742200372559',
'不上货原因': '_widget_1742268351779', '是否培训系统使用': '_widget_1749717287367',
'不培训系统使用原因': '_widget_1749717287369', '是否补货': '_widget_1749717287373',
'不补货原因': '_widget_1749717287375',
'是否进行滞销回抽+盘点介绍': '_widget_1742200372561',
'不进行滞销回抽+盘点介绍原因': '_widget_1742268351780',
'服务是否满意': '_widget_1743148999298', '服务不满意原因': '_widget_1743148999308',
'产品是否满意': '_widget_1743148999300', '产品不满意原因': '_widget_1743148999309',
# '上传评价图片': '_widget_1743148999310',
'审核备注': '_widget_1743500862664',
'完成日期时间': '_widget_1753162835213', '流水号': '_widget_1753163217437',
'提交人': 'creator', '提交时间': 'createTime', '更新时间': 'updateTime'}
def load_all_data(self):
# 获取经销商新签服务单数据
payload = {"api_key": "673d8427549d00c3d753c530",
"entry_id": "67c80eb3d2af9b9821928f45",
}
dealer_service = api_instance.entry_data_list(payload)
self.dealer_service_data = dealer_service.get("data") # api请求格式,将数据封装在data字典里
def data_process(self):
df = pd.DataFrame(self.dealer_service_data)
# 反转映射字典
reverse_mapping = {v: k for k, v in self.field_mapping.items()}
# 1.列明替换
df.columns = [reverse_mapping.get(col, col) for col in df.columns]
# 2.成员字段取值
user_columns = ["提交人", "销售负责人", "区域经理", "业务人员", "运营顾问", "运营专家"]
for col in user_columns:
df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "")
# 3.日期字段转为北京时间
time_columns = ["订单支付时间", "开通时间", "系统到期时间", "完成日期时间", "提交时间", "更新时间"]
df[time_columns] = df[time_columns].apply(
lambda col: pd.to_datetime(col, errors='coerce')
.dt.tz_localize(None)
.dt.strftime('%Y-%m-%d %H:%M:%S')
)
return df
def write_to_bi(self, df):
# 数据库连接信息
HS_DB_Config = {
'host': "f6-public.rwlb.rds.aliyuncs.com",
'user': "rw_operation_data_relay",
'password': "m+q5Z4%IVuF9bf",
'database': "f6operation_data_relay"
}
table_name = "new_dealer_service_order_to_bi" # 替换为你的实际表名
# 建立数据库连接
connection = mysql.connector.connect(
host=HS_DB_Config["host"],
user=HS_DB_Config["user"],
password=HS_DB_Config["password"],
database=HS_DB_Config["database"]
)
cursor = connection.cursor()
try:
# 查询表列名
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
columns_info = cursor.fetchall()
db_columns = [col[0] for col in columns_info] # 提取列名
df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None)
# 保留 DataFrame 中与数据库列名匹配的列
filtered_df = df[df.columns.intersection(db_columns)]
# 如果没有匹配的列,直接返回
if filtered_df.empty:
print("DataFrame 中没有与数据库表结构匹配的列。")
return
# 筛选列之后,插入前处理 dict 类型
filtered_df = filtered_df.copy()
for col in filtered_df.columns:
if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any():
filtered_df.loc[:, col] = filtered_df[col].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x
)
# 构建插入语句
placeholders = ', '.join(['%s'] * len(filtered_df.columns))
# 使用反引号避免特殊列明
columns = ', '.join([f"`{col}`" for col in filtered_df.columns])
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 将 DataFrame 写入数据库
for _, row in filtered_df.iterrows():
cursor.execute(insert_sql, tuple(row))
connection.commit()
print(f"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。")
except Exception as e:
print("写入数据库时发生错误:", e)
connection.rollback()
finally:
cursor.close()
connection.close()
def clear_table_data(self):
"""
清空指定 MySQL 表的数据。
参数已写死在函数内部,直接调用即可。
"""
# 数据库连接信息
HS_DB_Config = {
'host': "f6-public.rwlb.rds.aliyuncs.com",
'user': "rw_operation_data_relay",
'password': "m+q5Z4%IVuF9bf",
'database': "f6operation_data_relay"
}
table_name = "new_dealer_service_order_to_bi" # 要清空的表名
connection = None
try:
# 建立数据库连接
connection = mysql.connector.connect(
host=HS_DB_Config["host"],
user=HS_DB_Config["user"],
password=HS_DB_Config["password"],
database=HS_DB_Config["database"]
)
if connection.is_connected():
cursor = connection.cursor()
# 使用TRUNCATE清空表数据
cursor.execute(f"TRUNCATE TABLE {table_name}")
connection.commit()
print(f"成功清空表 {table_name} 中的所有数据")
except Error as e:
print(f"清空表时发生错误: {e}")
if connection and connection.is_connected():
connection.rollback()
finally:
if connection and connection.is_connected():
cursor.close()
connection.close()
print("数据库连接已关闭")
def main(self):
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# step1: 获取数据
self.load_all_data()
# step2:数据处理
df = self.data_process()
# step3:数据库删除
self.clear_table_data()
# step4:数据写入BI
self.write_to_bi(df)
# common_module.send_task_status(task_start_time, "经销商新签服务单转BI")
if __name__ == '__main__':
start = NewDealerServiceOrderToBI()
start.main()