Files
saas/back_ground_module/non_standar_performance_to_BI.py
T
2025-12-31 10:50:45 +08:00

327 lines
14 KiB
Python

# -*- coding: utf-8 -*-
import pandas as pd
import datetime
from config import Config
from api import API
import pymysql # 使用 pymysql 替代 mysql.connector
from back_ground_module import CommonModule
import os
import mysql.connector
import pandas as pd
import json
import numpy as np
import mysql.connector
from mysql.connector import Error
from log_config import configure_task_logger, configure_error_task_logger
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
api_instance = API()
common_module = CommonModule()
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class NonStandardPerformanceToBI:
""" 非标业绩提报转BI"""
def __init__(self):
self.dealer_service_data = None
self.field_mapping = {
"报备类型": "_widget_1753770875899",
"协作内容": "_widget_1753770875915",
"情况说明": "_widget_1753770875944",
"订单编号": "_widget_1753770875887",
"实付金额": "_widget_1753770875889",
"门店编码": "_widget_1753770875890",
"门店名称": "_widget_1753770875888",
"版本": "_widget_1753770875891",
"年限": "_widget_1753948745953",
"支付日期": "_widget_1753770875893",
"开户/处理日期": "_widget_1753770875894",
"小六业绩金额": "_widget_1753770875898",
"区域业绩金额": "_widget_1753770875937",
"报备业绩归属区域经理": "_widget_1753770875903",
"报备业绩归属大区": "_widget_1753866196486",
"原业绩归属人": "_widget_1753856032683",
"原业绩归属区域经理": "_widget_1753866196485",
"小六业绩比例": "_widget_1753770875917",
"区域业绩比例": "_widget_1753770875921",
"运营专家": "_widget_1753770875902",
"提成类型": "_widget_1753778922504",
"SaaS新签提成比例": "_widget_1753770875949",
"服务包提成比例": "_widget_1753778922567",
"提成金额": "_widget_1753770875948",
"新签提成比例-首年": "_widget_1753778922503",
"新签提成比例-非首年": "_widget_1753778922548",
"新签阶段及提成比例": "_widget_1753778656359",
"业绩动作":"_widget_1756708722933",
"提成动作":"_widget_1756708722932",
"新签阶段及提成比例.选择提成阶段": "_widget_1753778656359._widget_1753778656361",
"新签阶段及提成比例.新签阶段": "_widget_1753778656359._widget_1753948745962",
"新签阶段及提成比例.提成比例": "_widget_1753778656359._widget_1753778656362",
"业绩类型":"_widget_1753770875966",
"报备业绩归属小六":"_widget_1753770875901",
"原业绩归属大区":"_widget_1755159216098",
"业绩分类":"_widget_1758706882564",
"流程是否结束":"_widget_1761633418013",
"业绩类型-聚合":"_widget_1758706882564",
"业绩分组":"_widget_1762417447169",
"商品名称":"_widget_1762219744898",
"履约金额":"_widget_1762220516367",
"业绩归属日期":"_widget_1762417447127",
"公司名称":"_widget_1762420723743",
"公司ID":"_widget_1762420723744",
"报备业绩金额-区域提交":"_widget_1766375035236",
"业绩归属小六-区域提交":"_widget_1766461143813",
"业绩归属月":"_widget_1766375035265",
"是否同步衡石":"_widget_1766484337844",
"提交人": "creator",
"提交时间": "createTime",
"更新时间": "updateTime"
}
# 定义需要特殊处理的列表字段及其内部字段映射
self.list_fields_config = {
"新签阶段及提成比例": {
"_widget_1753778656361": "选择提成阶段",
"_widget_1753948745962": "新签阶段",
"_widget_1753778656362": "提成比例"
},
# 可以在这里添加其他列表字段的配置
# "另一个列表字段": {
# "原始字段名1": "映射后字段名1",
# "原始字段名2": "映射后字段名2"
# }
}
def load_all_data(self):
# 获取非标业绩提报数据
payload = {"api_key": "66b9678280b37f8a276b1d01",
"entry_id": "68886b7c0382a7249ae0b5d6",
}
dealer_service = api_instance.entry_data_list(payload)
self.dealer_service_data = dealer_service.get("data") # api请求格式,将数据封装在data字典里
def process_list_field(self, field_value, field_config):
"""通用方法:处理列表类型的字段"""
if not isinstance(field_value, (list, np.ndarray)):
return field_value
processed_list = []
for item in field_value:
if not isinstance(item, dict):
processed_list.append(item)
continue
processed_item = {}
for original_key, mapped_key in field_config.items():
if original_key in item:
# 处理包含id的字典字段
if isinstance(item[original_key], dict) and "id" in item[original_key]:
processed_item[mapped_key] = item[original_key]["id"]
else:
processed_item[mapped_key] = item[original_key]
else:
processed_item[mapped_key] = None
processed_list.append(processed_item)
return processed_list
def data_process(self):
df = pd.DataFrame(self.dealer_service_data)
# 反转映射字典
reverse_mapping = {v: k for k, v in self.field_mapping.items()}
# 1.列明替换
df.columns = [reverse_mapping.get(col, col) for col in df.columns]
# 只保留流程是否结束为是的内容
df = df[df["流程是否结束"] == ""]
# 2.成员字段取值
user_columns = ["报备业绩归属小六", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家","业绩归属小六-区域提交"]
for col in user_columns:
df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "")
# 3.日期字段转为北京时间
time_columns = ["支付日期", "开户/处理日期", "提交时间", "更新时间", "业绩归属月", "业绩归属日期"]
for col in time_columns:
# 1. 解析为 datetime,并明确指定为 UTC(即使原始字符串无时区)
dt_utc = pd.to_datetime(df[col], errors='coerce', utc=True)
# 2. 转换为北京时间
dt_beijing = dt_utc.dt.tz_convert('Asia/Shanghai')
# 3. 去掉时区信息(变成 naive datetime),然后格式化为字符串
df[col] = dt_beijing.dt.tz_localize(None).dt.strftime('%Y-%m-%d %H:%M:%S')
# 4.业绩动作等于拆单做复制
# 4.1. 定义条件
mask = df['业绩动作'] == '拆单'
# 4.2. 复制满足条件的行
new_rows = df[mask].copy() # ⚠️ 一定要用 .copy() 避免 SettingWithCopyWarning
# 3. 修改新行中的某些列
new_rows['小六业绩金额'] = -new_rows['小六业绩金额']
new_rows['区域业绩金额'] = -new_rows['区域业绩金额']
new_rows['报备业绩归属小六'] = new_rows['原业绩归属人']
new_rows['报备业绩归属区域经理'] = new_rows['原业绩归属区域经理']
new_rows['报备业绩归属大区'] = new_rows['原业绩归属大区']
# 4. 合并回原 DataFrame
df = pd.concat([df, new_rows], ignore_index=True)
# 5.处理所有配置的列表字段
if "新签阶段及提成比例" in df.columns:
# 先处理订单登记表字段
df["新签阶段及提成比例"] = df["新签阶段及提成比例"].apply(
lambda x: self.process_list_field(x, self.list_fields_config["新签阶段及提成比例"])
if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x))
else None
)
# 拆分行
df_exploded = df.explode("新签阶段及提成比例")
# 将订单登记表中的字段提取到主表中
order_fields = self.list_fields_config["新签阶段及提成比例"].values()
for field in order_fields:
df_exploded[field] = df_exploded["新签阶段及提成比例"].apply(
lambda x: x.get(field) if isinstance(x, dict) else None
)
# 删除原始的订单登记表列
df_exploded = df_exploded.drop(columns=["新签阶段及提成比例"])
# 重置索引
df = df_exploded.reset_index(drop=True)
return df
def write_to_bi(self, df):
# 数据库连接信息
HS_DB_Config = Config.HS_DB_Config
table_name = "non_standard_performance_to_BI" # 替换为你的实际表名
# 建立数据库连接
connection = mysql.connector.connect(
host=HS_DB_Config["host"],
user=HS_DB_Config["user"],
password=HS_DB_Config["password"],
database=HS_DB_Config["database"]
)
cursor = connection.cursor()
try:
# 查询表列名
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
columns_info = cursor.fetchall()
db_columns = [col[0] for col in columns_info] # 提取列名
df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None)
# 保留 DataFrame 中与数据库列名匹配的列
filtered_df = df[df.columns.intersection(db_columns)]
# 如果没有匹配的列,直接返回
if filtered_df.empty:
print("DataFrame 中没有与数据库表结构匹配的列。")
return
# 筛选列之后,插入前处理 dict 类型
filtered_df = filtered_df.copy()
for col in filtered_df.columns:
if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any():
filtered_df.loc[:, col] = filtered_df[col].apply(
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x
)
# 构建插入语句
placeholders = ', '.join(['%s'] * len(filtered_df.columns))
# 使用反引号避免特殊列明
columns = ', '.join([f"`{col}`" for col in filtered_df.columns])
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 将 DataFrame 写入数据库
for _, row in filtered_df.iterrows():
cursor.execute(insert_sql, tuple(row))
connection.commit()
logger.info(f"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。")
except Exception as e:
error_task_logger.error(f"写入数据库时发生错误: {e}")
connection.rollback()
finally:
cursor.close()
connection.close()
def clear_table_data(self):
"""
清空指定 MySQL 表的数据。
参数已写死在函数内部,直接调用即可。
"""
# 数据库连接信息
HS_DB_Config = {
'host': "f6-public.rwlb.rds.aliyuncs.com",
'user': "rw_operation_data_relay",
'password': "m+q5Z4%IVuF9bf",
'database': "f6operation_data_relay"
}
table_name = "non_standard_performance_to_BI" # 要清空的表名
connection = None
try:
# 建立数据库连接
connection = mysql.connector.connect(
host=HS_DB_Config["host"],
user=HS_DB_Config["user"],
password=HS_DB_Config["password"],
database=HS_DB_Config["database"]
)
if connection.is_connected():
cursor = connection.cursor()
# 使用TRUNCATE清空表数据
cursor.execute(f"TRUNCATE TABLE {table_name}")
connection.commit()
logger.info(f"成功清空表 {table_name} 中的所有数据")
except Error as e:
error_task_logger.error(f"清空表时发生错误: {e}")
if connection and connection.is_connected():
connection.rollback()
finally:
if connection and connection.is_connected():
cursor.close()
connection.close()
logger.info("数据库连接已关闭")
def main(self):
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info("任务开始")
# step1: 获取数据
self.load_all_data()
logger.info("加载数据完成")
# step2:数据处理
df = self.data_process()
# df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv"))
logger.info("数据处理完成")
# step3:数据库删除
self.clear_table_data()
logger.info("目标数据库已清空")
# step4:数据写入BI
self.write_to_bi(df)
logger.info("数据已写入数据库中")
common_module.send_task_status(task_start_time, "非标业绩提报转BI")
except Exception as e:
error_task_logger.error(f"非标业绩提报转BI发生错误{e}")
common_module.send_task_error(task_start_time,"非标业绩提报转BI", str(e))
if __name__ == '__main__':
start = NonStandardPerformanceToBI()
start.main()