非标业绩提报增加字段

This commit is contained in:
z66
2025-10-27 15:43:35 +08:00
parent bfb905f91e
commit 80ab4019c8
5 changed files with 146 additions and 20 deletions
+2
View File
@@ -14,3 +14,5 @@
2025-09-15 14:14:19,894 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:'NoneType' object has no attribute 'iterrows' 2025-09-15 14:14:19,894 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:'NoneType' object has no attribute 'iterrows'
2025-09-15 14:17:26,819 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:获取接车宝数据失败,返回None 2025-09-15 14:17:26,819 - JCB_efficient_car_pickup.py - error_task_logger - ERROR - 接车宝日常派发执行出错:获取接车宝数据失败,返回None
2025-09-26 11:03:13,587 - update_all_NGV_data_daily.py - error_task_logger - ERROR - NGV更新数据执行时发生异常: cannot reindex on an axis with duplicate labels 2025-09-26 11:03:13,587 - update_all_NGV_data_daily.py - error_task_logger - ERROR - NGV更新数据执行时发生异常: cannot reindex on an axis with duplicate labels
2025-10-27 11:09:20,885 - update_NGV_data.py - error_task_logger - ERROR - NGV过滤后数据保存异常: [Errno 22] Invalid argument: 'output\\2025-10-27 11:08:51NGV.csv'
2025-10-27 14:50:32,084 - non_standar_performance_to_BI.py - error_task_logger - ERROR - 非标业绩提报转BI发生错误'报备业绩归属人'
@@ -41,7 +41,6 @@ class NonStandardPerformanceToBI:
"开户/处理日期": "_widget_1753770875894", "开户/处理日期": "_widget_1753770875894",
"小六业绩金额": "_widget_1753770875898", "小六业绩金额": "_widget_1753770875898",
"区域业绩金额": "_widget_1753770875937", "区域业绩金额": "_widget_1753770875937",
"报备业绩归属人": "_widget_1753770875901",
"报备业绩归属区域经理": "_widget_1753770875903", "报备业绩归属区域经理": "_widget_1753770875903",
"报备业绩归属大区": "_widget_1753866196486", "报备业绩归属大区": "_widget_1753866196486",
"原业绩归属人": "_widget_1753856032683", "原业绩归属人": "_widget_1753856032683",
@@ -56,10 +55,15 @@ class NonStandardPerformanceToBI:
"新签提成比例-首年": "_widget_1753778922503", "新签提成比例-首年": "_widget_1753778922503",
"新签提成比例-非首年": "_widget_1753778922548", "新签提成比例-非首年": "_widget_1753778922548",
"新签阶段及提成比例": "_widget_1753778656359", "新签阶段及提成比例": "_widget_1753778656359",
"业绩动作":"_widget_1756708722933","提成动作":"_widget_1756708722932", "业绩动作":"_widget_1756708722933",
"提成动作":"_widget_1756708722932",
"新签阶段及提成比例.选择提成阶段": "_widget_1753778656359._widget_1753778656361", "新签阶段及提成比例.选择提成阶段": "_widget_1753778656359._widget_1753778656361",
"新签阶段及提成比例.新签阶段": "_widget_1753778656359._widget_1753948745962", "新签阶段及提成比例.新签阶段": "_widget_1753778656359._widget_1753948745962",
"新签阶段及提成比例.提成比例": "_widget_1753778656359._widget_1753778656362", "新签阶段及提成比例.提成比例": "_widget_1753778656359._widget_1753778656362",
"业绩类型":"_widget_1753770875966",
"报备业绩归属小六":"_widget_1753770875901",
"原业绩归属大区":"_widget_1755159216098",
"业绩分类":"_widget_1758706882564",
"提交人": "creator", "提交人": "creator",
"提交时间": "createTime", "提交时间": "createTime",
"更新时间": "updateTime" "更新时间": "updateTime"
@@ -119,7 +123,7 @@ class NonStandardPerformanceToBI:
df.columns = [reverse_mapping.get(col, col) for col in df.columns] df.columns = [reverse_mapping.get(col, col) for col in df.columns]
# 2.成员字段取值 # 2.成员字段取值
user_columns = ["报备业绩归属", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家"] user_columns = ["报备业绩归属小六", "报备业绩归属区域经理", "原业绩归属人", "原业绩归属区域经理", "运营专家"]
for col in user_columns: for col in user_columns:
df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "") df[col] = df[col].map(lambda x: x.get("name", "") if isinstance(x, dict) else "")
+3 -1
View File
@@ -105,10 +105,12 @@ class UpdateNGVData:
# all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j1.iterrows()] # 前两天的全部数据 # all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j1.iterrows()] # 前两天的全部数据
# all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j.iterrows()] # 前一天的全部数据 # all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j.iterrows()] # 前一天的全部数据
all_data = [self.row_to_dict(row, self.field_mapping) for index, row in filtered_df.iterrows()] # 增量数据 all_data = [self.row_to_dict(row, self.field_mapping) for index, row in filtered_df.iterrows()] # 增量数据
try: try:
filtered_df.to_csv(output_dir + "\\" + f"{task_start_time}NGV.csv") filtered_df.to_csv(os.path.join(output_dir, f"{task_start_time}NGV.csv"))
except Exception as e: except Exception as e:
error_task_logger.error(f"NGV过滤后数据保存异常: {e}") error_task_logger.error(f"NGV过滤后数据保存异常: {e}")
pass
# #
data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data} data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data}
+121 -3
View File
@@ -112,7 +112,7 @@ CONCURRENT_WORKERS = 8
USE_BATCH_CREATE = True # True=批量创建(快),False=逐条创建(慢) USE_BATCH_CREATE = True # True=批量创建(快),False=逐条创建(慢)
# 批量创建大小(每批次创建的记录数) # 批量创建大小(每批次创建的记录数)
BATCH_CREATE_SIZE = 100 BATCH_CREATE_SIZE = 90
# ==================================================== # ====================================================
@@ -607,7 +607,8 @@ class UpdateAllNGVDataDaily:
update_data_list.append({ update_data_list.append({
'org_code': org_code, 'org_code': org_code,
'data_id': str(data_id), 'data_id': str(data_id),
'data_dict': data_dict.copy() 'data_dict': data_dict.copy(),
'row_data': row # 保存原始数据用于输出
}) })
else: else:
# 创建操作:必须包含门店编码字段 # 创建操作:必须包含门店编码字段
@@ -617,7 +618,8 @@ class UpdateAllNGVDataDaily:
create_data_list.append({ create_data_list.append({
'org_code': idx, 'org_code': idx,
'data_dict': data_dict 'data_dict': data_dict,
'row_data': row # 保存原始数据用于输出
}) })
logger.info(f" - 需要更新: {len(update_data_list)}") logger.info(f" - 需要更新: {len(update_data_list)}")
@@ -630,6 +632,8 @@ class UpdateAllNGVDataDaily:
update_count = self._concurrent_update(update_data_list) update_count = self._concurrent_update(update_data_list)
else: else:
update_count = self._single_update(update_data_list) update_count = self._single_update(update_data_list)
# 输出更新统计
self._save_update_stats(update_data_list)
# 执行创建 # 执行创建
create_count = 0 create_count = 0
@@ -638,6 +642,8 @@ class UpdateAllNGVDataDaily:
create_count = self._batch_create(create_data_list) create_count = self._batch_create(create_data_list)
else: else:
create_count = self._single_create(create_data_list) create_count = self._single_create(create_data_list)
# 输出新增数据
self._save_create_data(create_data_list)
logger.info(f" ✓ 同步完成: 更新 {update_count} 条, 创建 {create_count}") logger.info(f" ✓ 同步完成: 更新 {update_count} 条, 创建 {create_count}")
@@ -1052,6 +1058,118 @@ class UpdateAllNGVDataDaily:
logger.error(f" ✗ 加载缓存失败 ({cache_key}): {e}") logger.error(f" ✗ 加载缓存失败 ({cache_key}): {e}")
return pd.DataFrame() return pd.DataFrame()
def _save_create_data(self, create_data_list):
"""
保存新增数据到文件
参数:
create_data_list: 新增数据列表
"""
if len(create_data_list) == 0:
return
try:
# 生成时间戳
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 提取数据到DataFrame
create_records = []
for item in create_data_list:
row_data = item['row_data']
record = {
'org_code': item['org_code'],
'org_name': row_data.get('org_name', ''),
'group_name': row_data.get('group_name', ''),
'org_type': row_data.get('org_type', ''),
'province_name': row_data.get('province_name', ''),
'city_name': row_data.get('city_name', ''),
'saas_version': row_data.get('saas_version', ''),
'active_status_fmt': row_data.get('active_status_fmt', ''),
}
create_records.append(record)
create_df = pd.DataFrame(create_records)
# 使用相对路径保存(支持跨平台)
file_path = os.path.join(output_dir, f'新增门店_{timestamp}.csv')
create_df.to_csv(file_path, index=False, encoding='utf-8-sig')
logger.info(f" ✓ 新增数据已保存: {file_path} ({len(create_df)} 条)")
except Exception as e:
error_task_logger.error(f"保存新增数据失败: {e}", exc_info=True)
def _save_update_stats(self, update_data_list):
"""
保存更新统计信息到文件
参数:
update_data_list: 更新数据列表
"""
if len(update_data_list) == 0:
return
try:
# 生成时间戳
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 统计每个org_code的更新记录数(去重)
org_code_counts = {}
org_code_info = {}
for item in update_data_list:
org_code = item['org_code']
if org_code not in org_code_counts:
org_code_counts[org_code] = 0
row_data = item['row_data']
org_code_info[org_code] = {
'org_name': row_data.get('org_name', ''),
'group_name': row_data.get('group_name', ''),
'org_type': row_data.get('org_type', ''),
'province_name': row_data.get('province_name', ''),
'city_name': row_data.get('city_name', ''),
'saas_version': row_data.get('saas_version', ''),
'active_status_fmt': row_data.get('active_status_fmt', ''),
}
org_code_counts[org_code] += 1
# 构建统计DataFrame
update_stats = []
for org_code, count in org_code_counts.items():
info = org_code_info[org_code]
stat = {
'org_code': org_code,
'org_name': info['org_name'],
'group_name': info['group_name'],
'org_type': info['org_type'],
'province_name': info['province_name'],
'city_name': info['city_name'],
'saas_version': info['saas_version'],
'active_status_fmt': info['active_status_fmt'],
'update_count': count,
'note': '同一org_code有多个记录' if count > 1 else ''
}
update_stats.append(stat)
update_df = pd.DataFrame(update_stats)
update_df = update_df.sort_values('update_count', ascending=False)
# 使用相对路径保存(支持跨平台)
file_path = os.path.join(output_dir, f'更新统计_{timestamp}.csv')
update_df.to_csv(file_path, index=False, encoding='utf-8-sig')
# 统计汇总
total_org_codes = len(org_code_counts)
total_records = len(update_data_list)
duplicate_org_codes = sum(1 for count in org_code_counts.values() if count > 1)
logger.info(f" ✓ 更新统计已保存: {file_path}")
logger.info(f" - 更新的org_code数: {total_org_codes}")
logger.info(f" - 更新的记录总数: {total_records}")
if duplicate_org_codes > 0:
logger.info(f" - 包含多条记录的org_code: {duplicate_org_codes}")
except Exception as e:
error_task_logger.error(f"保存更新统计失败: {e}", exc_info=True)
if __name__ == '__main__': if __name__ == '__main__':
updater = UpdateAllNGVDataDaily() updater = UpdateAllNGVDataDaily()
+13 -13
View File
@@ -257,8 +257,8 @@
{ {
"metadata": { "metadata": {
"ExecuteTime": { "ExecuteTime": {
"end_time": "2025-09-24T05:56:32.216878Z", "end_time": "2025-10-27T06:50:20.870562Z",
"start_time": "2025-09-24T05:56:31.974390Z" "start_time": "2025-10-27T06:50:20.631342Z"
} }
}, },
"cell_type": "code", "cell_type": "code",
@@ -267,17 +267,17 @@
"from mysql.connector import Error\n", "from mysql.connector import Error\n",
"\n", "\n",
"HS_DB_Config = {\n", "HS_DB_Config = {\n",
" 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n", " 'host': \"f6-public.rwlb.rds.aliyuncs.com\",\n",
" 'user': \"rw_operation_data_relay\",\n", " 'user': \"rw_operation_data_relay\",\n",
" 'password': \"m+q5Z4%IVuF9bf\",\n", " 'password': \"m+q5Z4%IVuF9bf\",\n",
" 'database': \"f6operation_data_relay\"\n", " 'database': \"f6operation_data_relay\"\n",
"} # 衡时数据库链接配置-mysql\n", " } # 衡时数据库链接配置-mysql\n",
"# table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n", "# table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n",
"\n", "\n",
"table_name = \"partner_settlement_to_BI\"\n", "table_name = \"non_standard_performance_to_BI\"\n",
"column_name = \"提交时间\"\n", "column_name = \"业绩分类\"\n",
"# new_column_type = \"VARCHAR(255)\" # 目标数据类型\n", "new_column_type = \"VARCHAR(255)\" # 目标数据类型\n",
"new_column_type = \"DATETIME\" # 目标数据类型\n", "# new_column_type = \"DATETIME\" # 目标数据类型\n",
"\n", "\n",
"try:\n", "try:\n",
" # 连接数据库\n", " # 连接数据库\n",
@@ -336,12 +336,12 @@
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"✅ 成功添加字段: `提交时间`\n", "✅ 成功添加字段: `业绩分类`\n",
"数据库连接已关闭\n" "数据库连接已关闭\n"
] ]
} }
], ],
"execution_count": 4 "execution_count": 5
}, },
{ {
"metadata": {}, "metadata": {},