新签合伙人、非标业绩提报
This commit is contained in:
@@ -24,3 +24,5 @@ from back_ground_module.update_molecule_reporting_adjustment_to_bi import Molecu
|
||||
from back_ground_module.import_performance_data import ImportPerformanceData
|
||||
from back_ground_module.data_monitor import DataMonitor
|
||||
from back_ground_module.new_dealer_service_order_to_bi import NewDealerServiceOrderToBI
|
||||
from back_ground_module.non_standar_performance_to_BI import NonStandardPerformanceToBI
|
||||
from back_ground_module.partner_settlement_to_BI import PartnerSettlementToBI
|
||||
|
||||
@@ -8044,3 +8044,16 @@
|
||||
2025-08-18 12:10:19,041 - revisit_new_services_180.py - task_logger - INFO - 关联数据图片:
|
||||
2025-08-18 12:10:20,202 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-18T04:10:14.333Z', 'updateTime': '2025-08-18T04:10:14.333Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-18T00:00:00.000Z', '_widget_1743644977694': '新签客户回访', '_widget_1744873387501': '2025-08-18T03:50:30.000Z', '_widget_1744873387502': '2025-08-18T04:10:20.000Z', '_widget_1744873387504': '1190', '_id': '68a2a7a6b00a82b43219c605', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}}
|
||||
2025-08-18 12:10:20,202 - revisit_new_services_180.py - task_logger - INFO - 新签客户回访任务完成
|
||||
2025-08-20 17:19:06,286 - api.py - task_logger - INFO - 已获取 8 条数据
|
||||
2025-08-20 17:19:07,202 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-20T09:19:04.961Z', 'updateTime': '2025-08-20T09:19:04.961Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-20T00:00:00.000Z', '_widget_1743644977694': '非标业绩提报转BI', '_widget_1744873387501': '2025-08-20T09:19:06.000Z', '_widget_1744873387502': '2025-08-20T09:19:07.000Z', '_widget_1744873387504': '1', '_id': '68a593087f71408faf8b8991', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}}
|
||||
2025-08-20 17:26:04,292 - partner_settlement_to_BI.py - task_logger - INFO - 任务开始
|
||||
2025-08-20 17:26:04,407 - partner_settlement_to_BI.py - task_logger - INFO - 加载数据完成
|
||||
2025-08-20 17:26:10,718 - non_standar_performance_to_BI.py - task_logger - INFO - 任务开始
|
||||
2025-08-20 17:26:10,820 - api.py - task_logger - INFO - 已获取 8 条数据
|
||||
2025-08-20 17:26:10,926 - non_standar_performance_to_BI.py - task_logger - INFO - 加载数据完成
|
||||
2025-08-20 17:26:10,936 - non_standar_performance_to_BI.py - task_logger - INFO - 数据处理完成
|
||||
2025-08-20 17:26:11,150 - non_standar_performance_to_BI.py - task_logger - INFO - 成功清空表 non_standard_performance_to_BI 中的所有数据
|
||||
2025-08-20 17:26:11,165 - non_standar_performance_to_BI.py - task_logger - INFO - 数据库连接已关闭
|
||||
2025-08-20 17:26:11,165 - non_standar_performance_to_BI.py - task_logger - INFO - 目标数据库已清空
|
||||
2025-08-20 17:26:11,430 - non_standar_performance_to_BI.py - task_logger - INFO - 数据已写入数据库中
|
||||
2025-08-20 17:26:11,563 - common_module.py - task_logger - INFO - 任务状态发送成功: {'data': {'creator': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'updater': {'name': 'F6汽车科技', 'username': '#admin', 'status': 1, 'type': 0}, 'deleter': None, 'createTime': '2025-08-20T09:26:09.312Z', 'updateTime': '2025-08-20T09:26:09.312Z', 'deleteTime': None, '_widget_1744873387500': '2025-08-20T00:00:00.000Z', '_widget_1743644977694': '非标业绩提报转BI', '_widget_1744873387501': '2025-08-20T09:26:10.000Z', '_widget_1744873387502': '2025-08-20T09:26:11.000Z', '_widget_1744873387504': '1', '_id': '68a594b16856c7dacd7824a4', 'appId': '6694d3c4fcb69ca9a111a6c4', 'entryId': '67ede908eb9c22261016466e'}}
|
||||
|
||||
@@ -0,0 +1,286 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import pandas as pd
|
||||
import datetime
|
||||
from config import Config
|
||||
from api import API
|
||||
import pymysql # 使用 pymysql 替代 mysql.connector
|
||||
from back_ground_module import CommonModule
|
||||
import os
|
||||
import mysql.connector
|
||||
import pandas as pd
|
||||
import json
|
||||
import numpy as np
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from log_config import configure_task_logger, configure_error_task_logger
|
||||
|
||||
logger = configure_task_logger()
|
||||
error_task_logger = configure_error_task_logger()
|
||||
api_instance = API()
|
||||
common_module = CommonModule()
|
||||
output_dir = "output" # 设置输出目录
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
|
||||
class NonStandardPerformanceToBI:
|
||||
def __init__(self):
|
||||
self.dealer_service_data = None
|
||||
self.field_mapping = {
|
||||
"报备类型": "_widget_1753770875899",
|
||||
"协作内容": "_widget_1753770875915",
|
||||
"订单类型": "_widget_1753770875966",
|
||||
"情况说明": "_widget_1753770875944",
|
||||
"订单编号": "_widget_1753770875887",
|
||||
"实付金额": "_widget_1753770875889",
|
||||
"门店编码": "_widget_1753770875890",
|
||||
"门店名称": "_widget_1753770875888",
|
||||
"版本": "_widget_1753770875891",
|
||||
"年限": "_widget_1753948745953",
|
||||
"支付日期": "_widget_1753770875893",
|
||||
"开户/处理日期": "_widget_1753770875894",
|
||||
"小六业绩金额": "_widget_1753770875898",
|
||||
"区域业绩金额": "_widget_1753770875937",
|
||||
"报备业绩归属人": "_widget_1753770875901",
|
||||
"报备业绩归属区域经理": "_widget_1753770875903",
|
||||
"报备业绩归属大区": "_widget_1753866196486",
|
||||
"原业绩归属人": "_widget_1753856032683",
|
||||
"原业绩归属区域经理": "_widget_1753866196485",
|
||||
"小六业绩比例": "_widget_1753770875917",
|
||||
"区域业绩比例": "_widget_1753770875921",
|
||||
"运营专家": "_widget_1753770875902",
|
||||
"提成类型": "_widget_1753778922504",
|
||||
"SaaS新签提成比例": "_widget_1753770875949",
|
||||
"服务包提成比例": "_widget_1753778922567",
|
||||
"提成金额": "_widget_1753770875948",
|
||||
"新签提成比例-首年": "_widget_1753778922503",
|
||||
"新签提成比例-非首年": "_widget_1753778922548",
|
||||
"新签阶段及提成比例": "_widget_1753778656359",
|
||||
"新签阶段及提成比例.选择提成阶段": "_widget_1753778656359._widget_1753778656361",
|
||||
"新签阶段及提成比例.新签阶段": "_widget_1753778656359._widget_1753948745962",
|
||||
"新签阶段及提成比例.提成比例": "_widget_1753778656359._widget_1753778656362",
|
||||
}
|
||||
|
||||
# 定义需要特殊处理的列表字段及其内部字段映射
|
||||
self.list_fields_config = {
|
||||
"新签阶段及提成比例": {
|
||||
"_widget_1753778656361": "选择提成阶段",
|
||||
"_widget_1753948745962": "新签阶段",
|
||||
"_widget_1753778656362": "提成比例"
|
||||
},
|
||||
# 可以在这里添加其他列表字段的配置
|
||||
# "另一个列表字段": {
|
||||
# "原始字段名1": "映射后字段名1",
|
||||
# "原始字段名2": "映射后字段名2"
|
||||
# }
|
||||
}
|
||||
|
||||
def load_all_data(self):
|
||||
# 获取非标业绩提报数据
|
||||
payload = {"api_key": "66b9678280b37f8a276b1d01",
|
||||
"entry_id": "68886b7c0382a7249ae0b5d6",
|
||||
}
|
||||
dealer_service = api_instance.entry_data_list(payload)
|
||||
self.dealer_service_data = dealer_service.get("data") # api请求格式,将数据封装在data字典里
|
||||
|
||||
def process_list_field(self, field_value, field_config):
|
||||
"""通用方法:处理列表类型的字段"""
|
||||
if not isinstance(field_value, (list, np.ndarray)):
|
||||
return field_value
|
||||
|
||||
processed_list = []
|
||||
for item in field_value:
|
||||
if not isinstance(item, dict):
|
||||
processed_list.append(item)
|
||||
continue
|
||||
|
||||
processed_item = {}
|
||||
for original_key, mapped_key in field_config.items():
|
||||
if original_key in item:
|
||||
# 处理包含id的字典字段
|
||||
if isinstance(item[original_key], dict) and "id" in item[original_key]:
|
||||
processed_item[mapped_key] = item[original_key]["id"]
|
||||
else:
|
||||
processed_item[mapped_key] = item[original_key]
|
||||
else:
|
||||
processed_item[mapped_key] = None
|
||||
processed_list.append(processed_item)
|
||||
return processed_list
|
||||
|
||||
def data_process(self):
|
||||
df = pd.DataFrame(self.dealer_service_data)
|
||||
# 反转映射字典
|
||||
reverse_mapping = {v: k for k, v in self.field_mapping.items()}
|
||||
# 1.列明替换
|
||||
df.columns = [reverse_mapping.get(col, col) for col in df.columns]
|
||||
|
||||
# 2.成员字段取值
|
||||
user_columns = ["报备业绩归属人", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家"]
|
||||
|
||||
for col in user_columns:
|
||||
df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "")
|
||||
|
||||
# 3.日期字段转为北京时间
|
||||
time_columns = ["支付日期", "开户/处理日期"]
|
||||
|
||||
df[time_columns] = df[time_columns].apply(
|
||||
lambda col: pd.to_datetime(col, errors='coerce')
|
||||
.dt.tz_localize(None)
|
||||
.dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
)
|
||||
|
||||
# 4.处理所有配置的列表字段
|
||||
if "新签阶段及提成比例" in df.columns:
|
||||
# 先处理订单登记表字段
|
||||
df["新签阶段及提成比例"] = df["新签阶段及提成比例"].apply(
|
||||
lambda x: self.process_list_field(x, self.list_fields_config["新签阶段及提成比例"])
|
||||
if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x))
|
||||
else None
|
||||
)
|
||||
|
||||
# 拆分行
|
||||
df_exploded = df.explode("新签阶段及提成比例")
|
||||
|
||||
# 将订单登记表中的字段提取到主表中
|
||||
order_fields = self.list_fields_config["新签阶段及提成比例"].values()
|
||||
for field in order_fields:
|
||||
df_exploded[field] = df_exploded["新签阶段及提成比例"].apply(
|
||||
lambda x: x.get(field) if isinstance(x, dict) else None
|
||||
)
|
||||
|
||||
# 删除原始的订单登记表列
|
||||
df_exploded = df_exploded.drop(columns=["新签阶段及提成比例"])
|
||||
|
||||
# 重置索引
|
||||
df = df_exploded.reset_index(drop=True)
|
||||
|
||||
return df
|
||||
|
||||
def write_to_bi(self, df):
|
||||
# 数据库连接信息
|
||||
HS_DB_Config = {
|
||||
'host': "f6-public.rwlb.rds.aliyuncs.com",
|
||||
'user': "rw_operation_data_relay",
|
||||
'password': "m+q5Z4%IVuF9bf",
|
||||
'database': "f6operation_data_relay"
|
||||
}
|
||||
table_name = "non_standard_performance_to_BI" # 替换为你的实际表名
|
||||
|
||||
# 建立数据库连接
|
||||
connection = mysql.connector.connect(
|
||||
host=HS_DB_Config["host"],
|
||||
user=HS_DB_Config["user"],
|
||||
password=HS_DB_Config["password"],
|
||||
database=HS_DB_Config["database"]
|
||||
)
|
||||
cursor = connection.cursor()
|
||||
|
||||
try:
|
||||
# 查询表列名
|
||||
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
|
||||
columns_info = cursor.fetchall()
|
||||
db_columns = [col[0] for col in columns_info] # 提取列名
|
||||
df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None)
|
||||
# 保留 DataFrame 中与数据库列名匹配的列
|
||||
filtered_df = df[df.columns.intersection(db_columns)]
|
||||
|
||||
# 如果没有匹配的列,直接返回
|
||||
if filtered_df.empty:
|
||||
print("DataFrame 中没有与数据库表结构匹配的列。")
|
||||
return
|
||||
|
||||
# 筛选列之后,插入前处理 dict 类型
|
||||
filtered_df = filtered_df.copy()
|
||||
for col in filtered_df.columns:
|
||||
if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any():
|
||||
filtered_df.loc[:, col] = filtered_df[col].apply(
|
||||
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x
|
||||
)
|
||||
|
||||
# 构建插入语句
|
||||
placeholders = ', '.join(['%s'] * len(filtered_df.columns))
|
||||
# 使用反引号避免特殊列明
|
||||
columns = ', '.join([f"`{col}`" for col in filtered_df.columns])
|
||||
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
|
||||
|
||||
# 将 DataFrame 写入数据库
|
||||
for _, row in filtered_df.iterrows():
|
||||
cursor.execute(insert_sql, tuple(row))
|
||||
|
||||
connection.commit()
|
||||
logger.info(f"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。")
|
||||
|
||||
except Exception as e:
|
||||
error_task_logger.error(f"写入数据库时发生错误: {e}")
|
||||
connection.rollback()
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
def clear_table_data(self):
|
||||
"""
|
||||
清空指定 MySQL 表的数据。
|
||||
参数已写死在函数内部,直接调用即可。
|
||||
"""
|
||||
# 数据库连接信息
|
||||
HS_DB_Config = {
|
||||
'host': "f6-public.rwlb.rds.aliyuncs.com",
|
||||
'user': "rw_operation_data_relay",
|
||||
'password': "m+q5Z4%IVuF9bf",
|
||||
'database': "f6operation_data_relay"
|
||||
}
|
||||
table_name = "non_standard_performance_to_BI" # 要清空的表名
|
||||
|
||||
connection = None
|
||||
try:
|
||||
# 建立数据库连接
|
||||
connection = mysql.connector.connect(
|
||||
host=HS_DB_Config["host"],
|
||||
user=HS_DB_Config["user"],
|
||||
password=HS_DB_Config["password"],
|
||||
database=HS_DB_Config["database"]
|
||||
)
|
||||
if connection.is_connected():
|
||||
cursor = connection.cursor()
|
||||
|
||||
# 使用TRUNCATE清空表数据
|
||||
cursor.execute(f"TRUNCATE TABLE {table_name}")
|
||||
connection.commit()
|
||||
|
||||
logger.info(f"成功清空表 {table_name} 中的所有数据")
|
||||
|
||||
except Error as e:
|
||||
error_task_logger.error(f"清空表时发生错误: {e}")
|
||||
if connection and connection.is_connected():
|
||||
connection.rollback()
|
||||
finally:
|
||||
if connection and connection.is_connected():
|
||||
cursor.close()
|
||||
connection.close()
|
||||
logger.info("数据库连接已关闭")
|
||||
|
||||
def main(self):
|
||||
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
try:
|
||||
logger.info("任务开始")
|
||||
# step1: 获取数据
|
||||
self.load_all_data()
|
||||
logger.info("加载数据完成")
|
||||
# step2:数据处理
|
||||
df = self.data_process()
|
||||
# df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv"))
|
||||
logger.info("数据处理完成")
|
||||
# step3:数据库删除
|
||||
self.clear_table_data()
|
||||
logger.info("目标数据库已清空")
|
||||
# step4:数据写入BI
|
||||
self.write_to_bi(df)
|
||||
logger.info("数据已写入数据库中")
|
||||
common_module.send_task_status(task_start_time, "非标业绩提报转BI")
|
||||
except Exception as e:
|
||||
error_task_logger.error(f"非标业绩提报转BI发生错误{e}")
|
||||
common_module.send_task_error(task_start_time,"非标业绩提报转BI", str(e))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
start = NonStandardPerformanceToBI()
|
||||
start.main()
|
||||
@@ -0,0 +1,278 @@
|
||||
## 获取数据
|
||||
# -*- coding: utf-8 -*-
|
||||
import pandas as pd
|
||||
import datetime
|
||||
from config import Config
|
||||
from api import API
|
||||
import pymysql # 使用 pymysql 替代 mysql.connector
|
||||
from back_ground_module import CommonModule
|
||||
import os
|
||||
import mysql.connector
|
||||
import pandas as pd
|
||||
import json
|
||||
import numpy as np
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from log_config import configure_task_logger, configure_error_task_logger
|
||||
import sys
|
||||
|
||||
logger = configure_task_logger()
|
||||
error_task_logger = configure_error_task_logger()
|
||||
api_instance = API()
|
||||
common_module = CommonModule()
|
||||
output_dir = "output" # 设置输出目录
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
|
||||
class PartnerSettlementToBI:
|
||||
def __init__(self):
|
||||
self.partner_settlement_data = None
|
||||
self.field_mapping = {
|
||||
"选择合伙人": "_widget_1753930627469",
|
||||
"合伙人姓名": "_widget_1712801992726",
|
||||
"手机号": "_widget_1712803222895",
|
||||
"合伙人身份": "_widget_1712803222894",
|
||||
"合伙人所在省市": "_widget_1712803222896",
|
||||
"合伙人登记人": "_widget_1712803222900",
|
||||
"战区经理": "_widget_1712803222901",
|
||||
"提交人": "_widget_1753941892609",
|
||||
"合伙人分类": "_widget_1753943042503",
|
||||
"战区": "_widget_1754530653275",
|
||||
"订单登记表": "_widget_1712803222905",
|
||||
"订单登记表.订单编号": "_widget_1712803222905._widget_1712803222907",
|
||||
"订单登记表.销售阶段": "_widget_1712803222905._widget_1712805391009",
|
||||
"订单登记表.版本": "_widget_1712803222905._widget_1712803222908",
|
||||
"订单登记表.年限": "_widget_1712803222905._widget_1712815331264",
|
||||
"订单登记表.成交金额": "_widget_1712803222905._widget_1712805391002",
|
||||
"订单登记表.佣金": "_widget_1712803222905._widget_1753952737266",
|
||||
"订单登记表.理论佣金": "_widget_1712803222905._widget_1753952737267",
|
||||
"订单登记表.佣金比例": "_widget_1712803222905._widget_1712807001396",
|
||||
"合计佣金": "_widget_1753948415171",
|
||||
"理论合计佣金": "_widget_1753952737280",
|
||||
"特殊情况备注": "_widget_1712805391035",
|
||||
"合伙人介绍证明(微信聊天截图等)": "_widget_1712815331256",
|
||||
"合伙人类型": "_widget_1753957844818",
|
||||
}
|
||||
|
||||
# 定义需要特殊处理的列表字段及其内部字段映射
|
||||
self.list_fields_config = {
|
||||
"订单登记表": {
|
||||
"_widget_1712803222907": "订单编号",
|
||||
"_widget_1712805391009": "销售阶段",
|
||||
"_widget_1712803222908": "版本",
|
||||
"_widget_1712815331264": "年限",
|
||||
"_widget_1712805391002": "成交金额",
|
||||
"_widget_1753952737266": "佣金",
|
||||
"_widget_1753952737267": "理论佣金",
|
||||
"_widget_1712807001396": "佣金比例",
|
||||
},
|
||||
# 可以在这里添加其他列表字段的配置
|
||||
# "另一个列表字段": {
|
||||
# "原始字段名1": "映射后字段名1",
|
||||
# "原始字段名2": "映射后字段名2"
|
||||
# }
|
||||
}
|
||||
|
||||
def load_all_data(self):
|
||||
payload = {"api_key": "66b9678280b37f8a276b1d01",
|
||||
# "entry_id": "68a57e3a0bc339d3384d1b0c", # 测试
|
||||
"entry_id": "661748c7c727764d79557674",
|
||||
}
|
||||
partner_settlement = api_instance.entry_data_list(payload)
|
||||
self.partner_settlement_data = partner_settlement.get("data") # api请求格式,将数据封装在data字典里
|
||||
|
||||
def process_list_field(self, field_value, field_config):
|
||||
"""通用方法:处理列表类型的字段"""
|
||||
if not isinstance(field_value, (list, np.ndarray)):
|
||||
return field_value
|
||||
|
||||
processed_list = []
|
||||
for item in field_value:
|
||||
if not isinstance(item, dict):
|
||||
processed_list.append(item)
|
||||
continue
|
||||
|
||||
processed_item = {}
|
||||
for original_key, mapped_key in field_config.items():
|
||||
if original_key in item:
|
||||
# 处理包含id的字典字段
|
||||
if isinstance(item[original_key], dict) and "id" in item[original_key]:
|
||||
processed_item[mapped_key] = item[original_key]["id"]
|
||||
else:
|
||||
processed_item[mapped_key] = item[original_key]
|
||||
else:
|
||||
processed_item[mapped_key] = None
|
||||
processed_list.append(processed_item)
|
||||
return processed_list
|
||||
|
||||
def data_process(self):
|
||||
if not self.partner_settlement_data:
|
||||
print("数据为空终止程序")
|
||||
sys.exit(1)
|
||||
df = pd.DataFrame(self.partner_settlement_data)
|
||||
# 反转映射字典
|
||||
reverse_mapping = {v: k for k, v in self.field_mapping.items()}
|
||||
# 1.列明替换
|
||||
df.columns = [reverse_mapping.get(col, col) for col in df.columns]
|
||||
|
||||
# 2.成员字段取值
|
||||
user_columns = ["合伙人登记人", "提交人", "战区经理"]
|
||||
|
||||
for col in user_columns:
|
||||
df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "")
|
||||
|
||||
# 3.处理订单登记表列表字段,将其拆分成多行
|
||||
if "订单登记表" in df.columns:
|
||||
# 先处理订单登记表字段
|
||||
df["订单登记表"] = df["订单登记表"].apply(
|
||||
lambda x: self.process_list_field(x, self.list_fields_config["订单登记表"])
|
||||
if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x))
|
||||
else None
|
||||
)
|
||||
|
||||
# 拆分行
|
||||
df_exploded = df.explode("订单登记表")
|
||||
|
||||
# 将订单登记表中的字段提取到主表中
|
||||
order_fields = self.list_fields_config["订单登记表"].values()
|
||||
for field in order_fields:
|
||||
df_exploded[field] = df_exploded["订单登记表"].apply(
|
||||
lambda x: x.get(field) if isinstance(x, dict) else None
|
||||
)
|
||||
|
||||
# 删除原始的订单登记表列
|
||||
df_exploded = df_exploded.drop(columns=["订单登记表"])
|
||||
|
||||
# 重置索引
|
||||
df = df_exploded.reset_index(drop=True)
|
||||
|
||||
return df
|
||||
|
||||
def write_to_bi(self, df):
|
||||
# 数据库连接信息
|
||||
HS_DB_Config = {
|
||||
'host': "f6-public.rwlb.rds.aliyuncs.com",
|
||||
'user': "rw_operation_data_relay",
|
||||
'password': "m+q5Z4%IVuF9bf",
|
||||
'database': "f6operation_data_relay"
|
||||
}
|
||||
table_name = "partner_settlement_to_BI" # 替换为你的实际表名
|
||||
|
||||
# 建立数据库连接
|
||||
connection = mysql.connector.connect(
|
||||
host=HS_DB_Config["host"],
|
||||
user=HS_DB_Config["user"],
|
||||
password=HS_DB_Config["password"],
|
||||
database=HS_DB_Config["database"]
|
||||
)
|
||||
cursor = connection.cursor()
|
||||
|
||||
try:
|
||||
# 查询表列名
|
||||
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
|
||||
columns_info = cursor.fetchall()
|
||||
db_columns = [col[0] for col in columns_info] # 提取列名
|
||||
df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None)
|
||||
# 保留 DataFrame 中与数据库列名匹配的列
|
||||
filtered_df = df[df.columns.intersection(db_columns)]
|
||||
|
||||
# 如果没有匹配的列,直接返回
|
||||
if filtered_df.empty:
|
||||
print("DataFrame 中没有与数据库表结构匹配的列。")
|
||||
return
|
||||
|
||||
# 筛选列之后,插入前处理 dict 类型
|
||||
filtered_df = filtered_df.copy()
|
||||
for col in filtered_df.columns:
|
||||
if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any():
|
||||
filtered_df.loc[:, col] = filtered_df[col].apply(
|
||||
lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x
|
||||
)
|
||||
|
||||
# 构建插入语句
|
||||
placeholders = ', '.join(['%s'] * len(filtered_df.columns))
|
||||
# 使用反引号避免特殊列明
|
||||
columns = ', '.join([f"`{col}`" for col in filtered_df.columns])
|
||||
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
|
||||
|
||||
# 将 DataFrame 写入数据库
|
||||
for _, row in filtered_df.iterrows():
|
||||
cursor.execute(insert_sql, tuple(row))
|
||||
|
||||
connection.commit()
|
||||
logger.info(f"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。")
|
||||
|
||||
except Exception as e:
|
||||
error_task_logger.error(f"写入数据库时发生错误: {e}")
|
||||
connection.rollback()
|
||||
finally:
|
||||
cursor.close()
|
||||
connection.close()
|
||||
|
||||
def clear_table_data(self):
|
||||
"""
|
||||
清空指定 MySQL 表的数据。
|
||||
参数已写死在函数内部,直接调用即可。
|
||||
"""
|
||||
# 数据库连接信息
|
||||
HS_DB_Config = {
|
||||
'host': "f6-public.rwlb.rds.aliyuncs.com",
|
||||
'user': "rw_operation_data_relay",
|
||||
'password': "m+q5Z4%IVuF9bf",
|
||||
'database': "f6operation_data_relay"
|
||||
}
|
||||
table_name = "partner_settlement_to_BI" # 要清空的表名
|
||||
|
||||
connection = None
|
||||
try:
|
||||
# 建立数据库连接
|
||||
connection = mysql.connector.connect(
|
||||
host=HS_DB_Config["host"],
|
||||
user=HS_DB_Config["user"],
|
||||
password=HS_DB_Config["password"],
|
||||
database=HS_DB_Config["database"]
|
||||
)
|
||||
if connection.is_connected():
|
||||
cursor = connection.cursor()
|
||||
|
||||
# 使用TRUNCATE清空表数据
|
||||
cursor.execute(f"TRUNCATE TABLE {table_name}")
|
||||
connection.commit()
|
||||
|
||||
print(f"成功清空表 {table_name} 中的所有数据")
|
||||
|
||||
except Error as e:
|
||||
print(f"清空表时发生错误: {e}")
|
||||
if connection and connection.is_connected():
|
||||
connection.rollback()
|
||||
finally:
|
||||
if connection and connection.is_connected():
|
||||
cursor.close()
|
||||
connection.close()
|
||||
print("数据库连接已关闭")
|
||||
|
||||
def main(self):
|
||||
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
try:
|
||||
logger.info("任务开始")
|
||||
# step1: 获取数据
|
||||
self.load_all_data()
|
||||
logger.info("加载数据完成")
|
||||
# step2:数据处理
|
||||
df = self.data_process()
|
||||
# df.to_csv(os.path.join(output_dir, "new_dealer_service_order_to_bi.csv"))
|
||||
logger.info("数据处理完成")
|
||||
# step3:数据库删除
|
||||
self.clear_table_data()
|
||||
logger.info("目标数据库已清空")
|
||||
# step4:数据写入BI
|
||||
self.write_to_bi(df)
|
||||
logger.info("数据已写入数据库中")
|
||||
|
||||
common_module.send_task_status(task_start_time, "合伙人结算登记同步到BI")
|
||||
except Exception as e:
|
||||
error_task_logger.error(f"合伙人结算登记同步到BI发生错误:{e}")
|
||||
common_module.send_task_error(task_start_time, "合伙人结算登记同步到BI", str(e))
|
||||
|
||||
|
||||
PartnerSettlementToBI().main()
|
||||
Reference in New Issue
Block a user