Files
saas/test/合伙人结算登记同步到BI.ipynb
T

349 lines
15 KiB
Plaintext

{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "## 合伙人结算登记表同步到Bi",
"id": "c73b9afd879b3e18"
},
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2025-08-20T09:17:27.280694Z",
"start_time": "2025-08-20T09:17:27.096281Z"
}
},
"source": [
"## 获取数据\n",
"# -*- coding: utf-8 -*-\n",
"import pandas as pd\n",
"import datetime\n",
"from config import Config\n",
"from api import API\n",
"import pymysql # 使用 pymysql 替代 mysql.connector\n",
"from back_ground_module import CommonModule\n",
"import os\n",
"import mysql.connector\n",
"import pandas as pd\n",
"import json\n",
"import numpy as np\n",
"import mysql.connector\n",
"from mysql.connector import Error\n",
"from log_config import configure_task_logger, configure_error_task_logger\n",
"import sys\n",
"\n",
"logger = configure_task_logger()\n",
"error_task_logger = configure_error_task_logger()\n",
"api_instance = API()\n",
"common_module = CommonModule()\n",
"output_dir = \"output\" # 设置输出目录\n",
"os.makedirs(output_dir, exist_ok=True)\n",
"\n",
"\n",
"class PartnerSettlementToBI:\n",
" def __init__(self):\n",
" self.partner_settlement_data = None\n",
" self.field_mapping = {\n",
" \"选择合伙人\": \"_widget_1753930627469\",\n",
" \"合伙人姓名\": \"_widget_1712801992726\",\n",
" \"手机号\": \"_widget_1712803222895\",\n",
" \"合伙人身份\": \"_widget_1712803222894\",\n",
" \"合伙人所在省市\": \"_widget_1712803222896\",\n",
" \"合伙人登记人\": \"_widget_1712803222900\",\n",
" \"战区经理\": \"_widget_1712803222901\",\n",
" \"提交人\": \"_widget_1753941892609\",\n",
" \"合伙人分类\": \"_widget_1753943042503\",\n",
" \"战区\": \"_widget_1754530653275\",\n",
" \"订单登记表\": \"_widget_1712803222905\",\n",
" \"订单登记表.订单编号\": \"_widget_1712803222905._widget_1712803222907\",\n",
" \"订单登记表.销售阶段\": \"_widget_1712803222905._widget_1712805391009\",\n",
" \"订单登记表.版本\": \"_widget_1712803222905._widget_1712803222908\",\n",
" \"订单登记表.年限\": \"_widget_1712803222905._widget_1712815331264\",\n",
" \"订单登记表.成交金额\": \"_widget_1712803222905._widget_1712805391002\",\n",
" \"订单登记表.佣金\": \"_widget_1712803222905._widget_1753952737266\",\n",
" \"订单登记表.理论佣金\": \"_widget_1712803222905._widget_1753952737267\",\n",
" \"订单登记表.佣金比例\": \"_widget_1712803222905._widget_1712807001396\",\n",
" \"合计佣金\": \"_widget_1753948415171\",\n",
" \"理论合计佣金\": \"_widget_1753952737280\",\n",
" \"特殊情况备注\": \"_widget_1712805391035\",\n",
" \"合伙人介绍证明(微信聊天截图等)\": \"_widget_1712815331256\",\n",
" \"合伙人类型\": \"_widget_1753957844818\",\n",
" }\n",
"\n",
" # 定义需要特殊处理的列表字段及其内部字段映射\n",
" self.list_fields_config = {\n",
" \"订单登记表\": {\n",
" \"_widget_1712803222907\": \"订单编号\",\n",
" \"_widget_1712805391009\": \"销售阶段\",\n",
" \"_widget_1712803222908\": \"版本\",\n",
" \"_widget_1712815331264\": \"年限\",\n",
" \"_widget_1712805391002\": \"成交金额\",\n",
" \"_widget_1753952737266\": \"佣金\",\n",
" \"_widget_1753952737267\": \"理论佣金\",\n",
" \"_widget_1712807001396\": \"佣金比例\",\n",
" },\n",
" # 可以在这里添加其他列表字段的配置\n",
" # \"另一个列表字段\": {\n",
" # \"原始字段名1\": \"映射后字段名1\",\n",
" # \"原始字段名2\": \"映射后字段名2\"\n",
" # }\n",
" }\n",
"\n",
" def load_all_data(self):\n",
" payload = {\"api_key\": \"66b9678280b37f8a276b1d01\",\n",
" # \"entry_id\": \"68a57e3a0bc339d3384d1b0c\", # 测试\n",
" \"entry_id\": \"661748c7c727764d79557674\",\n",
" }\n",
" partner_settlement = api_instance.entry_data_list(payload)\n",
" self.partner_settlement_data = partner_settlement.get(\"data\") # api请求格式,将数据封装在data字典里\n",
"\n",
" def process_list_field(self, field_value, field_config):\n",
" \"\"\"通用方法:处理列表类型的字段\"\"\"\n",
" if not isinstance(field_value, (list, np.ndarray)):\n",
" return field_value\n",
"\n",
" processed_list = []\n",
" for item in field_value:\n",
" if not isinstance(item, dict):\n",
" processed_list.append(item)\n",
" continue\n",
"\n",
" processed_item = {}\n",
" for original_key, mapped_key in field_config.items():\n",
" if original_key in item:\n",
" # 处理包含id的字典字段\n",
" if isinstance(item[original_key], dict) and \"id\" in item[original_key]:\n",
" processed_item[mapped_key] = item[original_key][\"id\"]\n",
" else:\n",
" processed_item[mapped_key] = item[original_key]\n",
" else:\n",
" processed_item[mapped_key] = None\n",
" processed_list.append(processed_item)\n",
" return processed_list\n",
"\n",
" def data_process(self):\n",
" if not self.partner_settlement_data:\n",
" print(\"数据为空终止程序\")\n",
" sys.exit(1)\n",
" df = pd.DataFrame(self.partner_settlement_data)\n",
" # 反转映射字典\n",
" reverse_mapping = {v: k for k, v in self.field_mapping.items()}\n",
" # 1.列明替换\n",
" df.columns = [reverse_mapping.get(col, col) for col in df.columns]\n",
"\n",
" # 2.成员字段取值\n",
" user_columns = [\"合伙人登记人\", \"提交人\", \"战区经理\"]\n",
"\n",
" for col in user_columns:\n",
" df[col] = df[col].map(lambda x: x.get(\"name\", \"\") if isinstance(x, dict) else \"\")\n",
"\n",
" # 3.处理订单登记表列表字段,将其拆分成多行\n",
" if \"订单登记表\" in df.columns:\n",
" # 先处理订单登记表字段\n",
" df[\"订单登记表\"] = df[\"订单登记表\"].apply(\n",
" lambda x: self.process_list_field(x, self.list_fields_config[\"订单登记表\"])\n",
" if x is not None and (isinstance(x, (list, dict, np.ndarray)) or not pd.isna(x))\n",
" else None\n",
" )\n",
"\n",
" # 拆分行\n",
" df_exploded = df.explode(\"订单登记表\")\n",
"\n",
" # 将订单登记表中的字段提取到主表中\n",
" order_fields = self.list_fields_config[\"订单登记表\"].values()\n",
" for field in order_fields:\n",
" df_exploded[field] = df_exploded[\"订单登记表\"].apply(\n",
" lambda x: x.get(field) if isinstance(x, dict) else None\n",
" )\n",
"\n",
" # 删除原始的订单登记表列\n",
" df_exploded = df_exploded.drop(columns=[\"订单登记表\"])\n",
"\n",
" # 重置索引\n",
" df = df_exploded.reset_index(drop=True)\n",
"\n",
" return df\n",
"\n",
" def write_to_bi(self, df):\n",
" # 数据库连接信息\n",
" HS_DB_Config = {\n",
" 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n",
" 'user': \"rw_operation_data_relay\",\n",
" 'password': \"m+q5Z4%IVuF9bf\",\n",
" 'database': \"f6operation_data_relay\"\n",
" }\n",
" table_name = \"partner_settlement_to_BI\" # 替换为你的实际表名\n",
"\n",
" # 建立数据库连接\n",
" connection = mysql.connector.connect(\n",
" host=HS_DB_Config[\"host\"],\n",
" user=HS_DB_Config[\"user\"],\n",
" password=HS_DB_Config[\"password\"],\n",
" database=HS_DB_Config[\"database\"]\n",
" )\n",
" cursor = connection.cursor()\n",
"\n",
" try:\n",
" # 查询表列名\n",
" cursor.execute(f\"SHOW COLUMNS FROM {table_name}\")\n",
" columns_info = cursor.fetchall()\n",
" db_columns = [col[0] for col in columns_info] # 提取列名\n",
" df = df.replace([None, np.nan, pd.NA, 'nan', 'NaN', 'NAN', ''], None)\n",
" # 保留 DataFrame 中与数据库列名匹配的列\n",
" filtered_df = df[df.columns.intersection(db_columns)]\n",
"\n",
" # 如果没有匹配的列,直接返回\n",
" if filtered_df.empty:\n",
" print(\"DataFrame 中没有与数据库表结构匹配的列。\")\n",
" return\n",
"\n",
" # 筛选列之后,插入前处理 dict 类型\n",
" filtered_df = filtered_df.copy()\n",
" for col in filtered_df.columns:\n",
" if filtered_df[col].apply(lambda x: isinstance(x, (dict, list)) if x is not None else False).any():\n",
" filtered_df.loc[:, col] = filtered_df[col].apply(\n",
" lambda x: json.dumps(x, ensure_ascii=False) if x is not None else x\n",
" )\n",
"\n",
" # 构建插入语句\n",
" placeholders = ', '.join(['%s'] * len(filtered_df.columns))\n",
" # 使用反引号避免特殊列明\n",
" columns = ', '.join([f\"`{col}`\" for col in filtered_df.columns])\n",
" insert_sql = f\"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})\"\n",
"\n",
" # 将 DataFrame 写入数据库\n",
" for _, row in filtered_df.iterrows():\n",
" cursor.execute(insert_sql, tuple(row))\n",
"\n",
" connection.commit()\n",
" print(f\"成功写入 {len(filtered_df)} 条记录到 {table_name} 表中。\")\n",
"\n",
" except Exception as e:\n",
" print(\"写入数据库时发生错误:\", e)\n",
" connection.rollback()\n",
" finally:\n",
" cursor.close()\n",
" connection.close()\n",
"\n",
" def clear_table_data(self):\n",
" \"\"\"\n",
" 清空指定 MySQL 表的数据。\n",
" 参数已写死在函数内部,直接调用即可。\n",
" \"\"\"\n",
" # 数据库连接信息\n",
" HS_DB_Config = {\n",
" 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n",
" 'user': \"rw_operation_data_relay\",\n",
" 'password': \"m+q5Z4%IVuF9bf\",\n",
" 'database': \"f6operation_data_relay\"\n",
" }\n",
" table_name = \"partner_settlement_to_BI\" # 要清空的表名\n",
"\n",
" connection = None\n",
" try:\n",
" # 建立数据库连接\n",
" connection = mysql.connector.connect(\n",
" host=HS_DB_Config[\"host\"],\n",
" user=HS_DB_Config[\"user\"],\n",
" password=HS_DB_Config[\"password\"],\n",
" database=HS_DB_Config[\"database\"]\n",
" )\n",
" if connection.is_connected():\n",
" cursor = connection.cursor()\n",
"\n",
" # 使用TRUNCATE清空表数据\n",
" cursor.execute(f\"TRUNCATE TABLE {table_name}\")\n",
" connection.commit()\n",
"\n",
" print(f\"成功清空表 {table_name} 中的所有数据\")\n",
"\n",
" except Error as e:\n",
" print(f\"清空表时发生错误: {e}\")\n",
" if connection and connection.is_connected():\n",
" connection.rollback()\n",
" finally:\n",
" if connection and connection.is_connected():\n",
" cursor.close()\n",
" connection.close()\n",
" print(\"数据库连接已关闭\")\n",
"\n",
" def main(self):\n",
" task_start_time = datetime.datetime.now().strftime(\"%Y-%m-%d %H:%M:%S\")\n",
"\n",
" # 获取数据\n",
" self.load_all_data()\n",
" print(\"数据加载完成\")\n",
"\n",
" # 处理数据\n",
" df = self.data_process()\n",
" # df.to_csv(f\"{output_dir}/partner_settlement.csv\", index=False)\n",
"\n",
" # step3:数据库删除\n",
" self.clear_table_data()\n",
"\n",
" # step4:数据写入BI\n",
" self.write_to_bi(df)\n",
"\n",
" common_module.send_task_status(task_start_time, \"合伙人结算登记同步到BI\")\n",
"\n",
"\n",
"PartnerSettlementToBI().main()\n",
"\n"
],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"数据加载完成\n",
"[]\n",
"数据为空终止程序\n"
]
},
{
"ename": "SystemExit",
"evalue": "1",
"output_type": "error",
"traceback": [
"An exception has occurred, use %tb to see the full traceback.\n",
"\u001B[31mSystemExit\u001B[39m\u001B[31m:\u001B[39m 1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"D:\\ProgramTools\\anaconda3\\envs\\jdy\\Lib\\site-packages\\IPython\\core\\interactiveshell.py:3707: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D.\n",
" warn(\"To exit: use 'exit', 'quit', or Ctrl-D.\", stacklevel=1)\n"
]
}
],
"execution_count": 7
}
],
"metadata": {
"kernelspec": {
"display_name": "saas",
"language": "python",
"name": "saas"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}