saas1.6日志更新

This commit is contained in:
z66
2025-08-14 11:55:03 +08:00
parent d5e60e9014
commit 3bffc6946b
34 changed files with 2999 additions and 2907 deletions
+183 -180
View File
@@ -16,6 +16,13 @@ error_task_logger = configure_error_task_logger()
start_time = datetime.datetime.now()
api_instance = API()
common_module = CommonModule()
# 保存为CSV文件
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
import os
os.makedirs(output_dir, exist_ok=True)
class UpdateAllNGVDataDaily:
@@ -26,225 +33,221 @@ class UpdateAllNGVDataDaily:
self.fields()
def main(self):
# 保存为CSV文件
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
import os
os.makedirs(output_dir, exist_ok=True)
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 获取NGV数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"}
NGV_data_list = api_instance.entry_data_list(payload).get("data", [])
jdy_NGV_data = pd.DataFrame(NGV_data_list)
try:
logger.info("开始执行任务:{}".format(task_start_time))
# 获取NGV数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"}
NGV_data_list = api_instance.entry_data_list(payload).get("data", [])
jdy_NGV_data = pd.DataFrame(NGV_data_list)
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
}
staff_id = api_instance.entry_data_list(payload)
staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
}
staff_id = api_instance.entry_data_list(payload)
staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
logger.info("已获取数据")
# for i in range(1,2):
data_NGV_j = common_module.get_ngv_details(days_back=1)
data_NGV_j.to_csv(os.path.join(output_dir, f"data_NGV_j.csv"), index=False)
data_NGV_j1 = common_module.get_ngv_details(days_back=2)
# for i in range(1,2):
data_NGV_j = common_module.get_ngv_details(days_back=1)
data_NGV_j.to_csv(os.path.join(output_dir, f"data_NGV_j.csv"), index=False)
data_NGV_j1 = common_module.get_ngv_details(days_back=2)
# 对 data_NGV 进行进一步的过滤,只保留 org_type 为 "一般" 的记录
data_NGV_j = data_NGV_j[data_NGV_j['org_type'] == '一般']
data_NGV_j1 = data_NGV_j1[data_NGV_j1['org_type'] == '一般']
# 对 data_NGV 进行进一步的过滤,只保留 org_type 为 "一般" 的记录
data_NGV_j = data_NGV_j[data_NGV_j['org_type'] == '一般']
data_NGV_j1 = data_NGV_j1[data_NGV_j1['org_type'] == '一般']
# 去除不需要的列
columns_to_remove = {'date_id', 'date_fmt', 'pt', 'etl_time'}
# 去除不需要的列
columns_to_remove = {'date_id', 'date_fmt', 'pt', 'etl_time'}
# 获取所有列名并计算要保留的列
columns_to_keep_df1 = list(set(data_NGV_j.columns) - columns_to_remove)
columns_to_keep_df2 = list(set(data_NGV_j1.columns) - columns_to_remove)
# 获取所有列名并计算要保留的列
columns_to_keep_df1 = list(set(data_NGV_j.columns) - columns_to_remove)
columns_to_keep_df2 = list(set(data_NGV_j1.columns) - columns_to_remove)
# 过滤DataFrame以去除指定列
df1_filtered = data_NGV_j[columns_to_keep_df1]
df2_filtered = data_NGV_j1[columns_to_keep_df2]
# 过滤DataFrame以去除指定列
df1_filtered = data_NGV_j[columns_to_keep_df1]
df2_filtered = data_NGV_j1[columns_to_keep_df2]
# 设置唯一标识列作为索引
df1_set_index = df1_filtered.set_index('id_own_org')
df2_set_index = df2_filtered.set_index('id_own_org')
# 设置唯一标识列作为索引
df1_set_index = df1_filtered.set_index('id_own_org')
df2_set_index = df2_filtered.set_index('id_own_org')
df1_set_index = df1_set_index.astype(str).replace(['nan', 'None'], '', ).fillna("")
df2_set_index = df2_set_index.astype(str).replace(['nan', 'None'], '', ).fillna("")
df1_set_index = df1_set_index.astype(str).replace(['nan', 'None'], '', ).fillna("")
df2_set_index = df2_set_index.astype(str).replace(['nan', 'None'], '', ).fillna("")
# 找到两个DataFrame共有的索引
common_index = df1_set_index.index.intersection(df2_set_index.index)
# 找到两个DataFrame共有的索引
common_index = df1_set_index.index.intersection(df2_set_index.index)
# 使用共同的索引来重新索引两个DataFrame
df1_common = df1_set_index.reindex(common_index).fillna('')
df2_common = df2_set_index.reindex(common_index).fillna('')
# 使用共同的索引来重新索引两个DataFrame
df1_common = df1_set_index.reindex(common_index).fillna('')
df2_common = df2_set_index.reindex(common_index).fillna('')
# 确保两个DataFrame有相同的列顺序
common_columns = df1_common.columns.intersection(df2_common.columns)
df1_common = df1_common[common_columns]
df2_common = df2_common[common_columns]
# 确保两个DataFrame有相同的列顺序
common_columns = df1_common.columns.intersection(df2_common.columns)
df1_common = df1_common[common_columns]
df2_common = df2_common[common_columns]
# 比较两个DataFrame的内容
comparison_column = 'match_status'
# 比较两个DataFrame的内容
comparison_column = 'match_status'
# 创建一个布尔Series,指示每一行是否完全相同
matches = (df1_common == df2_common).all(axis=1)
# 创建一个布尔Series,指示每一行是否完全相同
matches = (df1_common == df2_common).all(axis=1)
# 添加新列到第一个DataFrame,标记是否匹配
df1_common[comparison_column] = matches.map({True: '一致', False: '不一致'})
# df1_common.to_csv(os.path.join(output_dir, f"df1_common.csv"))
# 添加新列到第一个DataFrame,标记是否匹配
df1_common[comparison_column] = matches.map({True: '一致', False: '不一致'})
# df1_common.to_csv(os.path.join(output_dir, f"df1_common.csv"))
# 如果需要也可以添加到第二个DataFrame(这里假设只需要处理df1_common)
# df2_common[comparison_column] = matches.map({True: '一致', False: '不一致'})
# 如果需要也可以添加到第二个DataFrame(这里假设只需要处理df1_common)
# df2_common[comparison_column] = matches.map({True: '一致', False: '不一致'})
# 提取只在一个DataFrame中存在的索引对应的行
df1_only_index = df1_set_index.index.difference(df2_set_index.index)
df2_only_index = df2_set_index.index.difference(df1_set_index.index)
# 提取只在一个DataFrame中存在的索引对应的行
df1_only_index = df1_set_index.index.difference(df2_set_index.index)
df2_only_index = df2_set_index.index.difference(df1_set_index.index)
df1_only_rows = df1_set_index.loc[df1_only_index].copy()
df2_only_rows = df2_set_index.loc[df2_only_index].copy()
df1_only_rows = df1_set_index.loc[df1_only_index].copy()
df2_only_rows = df2_set_index.loc[df2_only_index].copy()
# 保存匹配结果
# df1_common.to_csv(os.path.join(output_dir, 'matched_results.csv'), index_label='id_own_org')
# 保存匹配结果
# df1_common.to_csv(os.path.join(output_dir, 'matched_results.csv'), index_label='id_own_org')
# 保存仅在df1中的行
# df1_only_rows.to_csv(os.path.join(output_dir, 'df1_only_rows.csv'), index_label='id_own_org')
# 保存仅在df1中的行
# df1_only_rows.to_csv(os.path.join(output_dir, 'df1_only_rows.csv'), index_label='id_own_org')
# 保存仅在df2中的行
# df2_only_rows.to_csv(os.path.join(output_dir, 'df2_only_rows.csv'), index_label='id_own_org')
# data_NGV_j.to_csv(os.path.join(output_dir, 'data_NGV_j.csv'), index_label='id_own_org')
# data_NGV_j1.to_csv(os.path.join(output_dir, 'data_NGV_j1.csv'), index_label='id_own_org')
# jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data.csv'), index_label='id_own_org')
# 保存仅在df2中的行
# df2_only_rows.to_csv(os.path.join(output_dir, 'df2_only_rows.csv'), index_label='id_own_org')
# data_NGV_j.to_csv(os.path.join(output_dir, 'data_NGV_j.csv'), index_label='id_own_org')
# data_NGV_j1.to_csv(os.path.join(output_dir, 'data_NGV_j1.csv'), index_label='id_own_org')
# jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data.csv'), index_label='id_own_org')
# print(f"\nCSV文件已保存到目录: {output_dir}")
# print(f"\nCSV文件已保存到目录: {output_dir}")
temp_jdy_NGV_data = jdy_NGV_data.copy()
temp_jdy_NGV_data = jdy_NGV_data.copy()
# temp_jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data.csv'), index=False)
temp_jdy_NGV_data.reset_index(inplace=True) # 如果 '门店id' 是索引,则先将其转换为普通列
# temp_jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data1.csv'), index=False)
if '_widget_1734062123069' not in temp_jdy_NGV_data.columns:
print("'门店id' 不存在")
temp_jdy_NGV_data.rename(columns={'_widget_1734062123069': 'id_own_org'}, inplace=True)
temp_jdy_NGV_data.set_index('id_own_org', inplace=True)
# temp_jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data.csv'), index=False)
temp_jdy_NGV_data.reset_index(inplace=True) # 如果 '门店id' 是索引,则先将其转换为普通列
# temp_jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data1.csv'), index=False)
if '_widget_1734062123069' not in temp_jdy_NGV_data.columns:
error_task_logger.error("'门店id' 不存在")
temp_jdy_NGV_data.rename(columns={'_widget_1734062123069': 'id_own_org'}, inplace=True)
temp_jdy_NGV_data.set_index('id_own_org', inplace=True)
# 如果简道云存在,NGV不存在则标记NGV已删除
# 找出在 temp_jdy_NGV_data 中存在,但在 df1_common 中不存在的索引
ids_in_jdy_not_in_df1 = temp_jdy_NGV_data.index[~temp_jdy_NGV_data.index.isin(df1_common.index)]
# 提取这些行,形成新的 DataFrame
only_in_temp_jdy = temp_jdy_NGV_data.loc[ids_in_jdy_not_in_df1]
# 对数据源已经去掉的门店进行标记
for index, only_row in only_in_temp_jdy.iterrows():
result = {}
if '_id' in only_in_temp_jdy.columns:
_id_value = str(only_row['_id']) if not pd.isna(only_row['_id']) else None
result["_id"] = _id_value
# 如果简道云存在,NGV不存在则标记NGV已删除
# 找出在 temp_jdy_NGV_data 中存在,但在 df1_common 中不存在的索引
ids_in_jdy_not_in_df1 = temp_jdy_NGV_data.index[~temp_jdy_NGV_data.index.isin(df1_common.index)]
# 提取这些行,形成新的 DataFrame
only_in_temp_jdy = temp_jdy_NGV_data.loc[ids_in_jdy_not_in_df1]
# 对数据源已经去掉的门店进行标记
for index, only_row in only_in_temp_jdy.iterrows():
result = {}
if '_id' in only_in_temp_jdy.columns:
_id_value = str(only_row['_id']) if not pd.isna(only_row['_id']) else None
result["_id"] = _id_value
if result["_id"]:
data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data_id": result["_id"],
"data": {"_widget_1754285499851": {"value": "已删除"}}
}
if result["_id"]:
data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data_id": result["_id"],
"data": {"_widget_1754285499851": {"value": "已删除"}}
}
api_instance.entry_data_update(data=data, max_retries=20)
api_instance.entry_data_update(data=data, max_retries=20)
# 简道云与ngv不一致的数据做关联
df1_common = df1_common.join(temp_jdy_NGV_data["_id"], how='left')
df1_common = df1_common[df1_common['match_status'] == '不一致']
# 简道云与ngv不一致的数据做关联
df1_common = df1_common.join(temp_jdy_NGV_data["_id"], how='left')
df1_common = df1_common[df1_common['match_status'] == '不一致']
# 日期字段转换为日期格式
time_columns = ['saas_create_time', 'expiry_time', 'install_create_time', "last_end_date",
"renew_date"]
new_filtered_df = df1_common.copy() # 复制df,以调整时间
for col in time_columns:
# 1. 转换为datetime类型(带错误处理)
# 使用.loc安全赋值
new_filtered_df[col] = pd.to_datetime(df1_common[col], errors='coerce', utc=False)
# 日期字段转换为日期格式
time_columns = ['saas_create_time', 'expiry_time', 'install_create_time', "last_end_date",
"renew_date"]
new_filtered_df = df1_common.copy() # 复制df,以调整时间
for col in time_columns:
# 1. 转换为datetime类型(带错误处理)
# 使用.loc安全赋值
new_filtered_df[col] = pd.to_datetime(df1_common[col], errors='coerce', utc=False)
# 2. 优化后的时区转换(高效向量化操作)
df1_common[col + '_date'] = (
new_filtered_df[col]
# 本地化为北京时间(东八区)
.dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='NaT')
# 转换为UTC时区
.dt.tz_convert('UTC')
# 格式化为ISO8601字符串
.dt.strftime('%Y-%m-%dT%H:%M:%SZ')
)
# 2. 优化后的时区转换(高效向量化操作)
df1_common[col + '_date'] = (
new_filtered_df[col]
# 本地化为北京时间(东八区)
.dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='NaT')
# 转换为UTC时区
.dt.tz_convert('UTC')
# 格式化为ISO8601字符串
.dt.strftime('%Y-%m-%dT%H:%M:%SZ')
)
logger.info("日期已转换为UTC格式")
# 人员字段转换为人员字段
staff_columns = ['area_manager', 'service_impl_principal', "service_salesmen", "technician"]
# 将员工列表转为DataFrame
# 三重循环临时方案(确保可写入)
for col in staff_columns:
staff_ids = []
for _, row in df1_common.iterrows():
matched = False
for staff in staff_id_list:
if str(staff['_widget_1734942794144']) == str(row[col]):
staff_ids.append(staff['_widget_1734942794145'])
matched = True
break
if not matched:
staff_ids.append(None)
df1_common[col + "_staff_id"] = staff_ids
# 人员字段转换为人员字段
staff_columns = ['area_manager', 'service_impl_principal', "service_salesmen", "technician"]
# 将员工列表转为DataFrame
# 三重循环临时方案(确保可写入)
for col in staff_columns:
staff_ids = []
for _, row in df1_common.iterrows():
matched = False
for staff in staff_id_list:
if str(staff['_widget_1734942794144']) == str(row[col]):
staff_ids.append(staff['_widget_1734942794145'])
matched = True
break
if not matched:
staff_ids.append(None)
df1_common[col + "_staff_id"] = staff_ids
logger.info("人员字段已替换")
# 并发请求
futures = []
all_data = []
# 并发请求
futures = []
all_data = []
logger.info(f"今日更新数据量为:{len(df1_common)}")
for idx, row in df1_common.iterrows():
result = {}
data_dict = {}
for idx, row in df1_common.iterrows():
result = {}
data_dict = {}
# 根据 field_mapping 进行字段替换
for col_name, widget_id in self.field_mapping.items():
if col_name in df1_common.columns:
value = row[col_name]
clean_value = None if pd.isna(value) else value
data_dict[widget_id] = {"value": clean_value}
# 根据 field_mapping 进行字段替换
for col_name, widget_id in self.field_mapping.items():
if col_name in df1_common.columns:
value = row[col_name]
clean_value = None if pd.isna(value) else value
data_dict[widget_id] = {"value": clean_value}
# 单独处理 _id 列,并将其转换为字符串
if '_id' in df1_common.columns:
_id_value = str(row['_id']) if not pd.isna(row['_id']) else None
result["_id"] = _id_value
# 单独处理 _id 列,并将其转换为字符串
if '_id' in df1_common.columns:
_id_value = str(row['_id']) if not pd.isna(row['_id']) else None
result["_id"] = _id_value
# 组装最终结果
if result["_id"]:
data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data_id": result["_id"],
"data": data_dict
}
# 组装最终结果
if result["_id"]:
data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data_id": result["_id"],
"data": data_dict
}
api_instance.entry_data_update(data=data, max_retries=20)
else:
# continue
data1 = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data": data_dict}
api_instance.data_batch_create(data=data1, max_retries=20)
api_instance.entry_data_update(data=data, max_retries=20)
else:
# continue
data1 = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data": data_dict}
res = api_instance.data_batch_create(data=data1, max_retries=20)
logger.info(f"补派数据:{res}")
# all_data.append(data_dict)
# all_data.append(data_dict)
# 收集所有结果
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
logger.info(f"所有请求结果:{result}")
except Exception as exc:
error_task_logger.error(f"请求发生异常: {exc}")
# 收集所有结果
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print("请求结果:", result)
except Exception as exc:
print(f"请求发生异常: {exc}")
end_time = datetime.datetime.now()
# df11 = pd.DataFrame(all_data)
# df11.to_csv(f"all_data.csv")
time_diff = end_time - start_time
# 打印天数、秒数和微秒数
print(f"执行时间: {time_diff.days} 天, {time_diff.seconds} 秒, {time_diff.microseconds} 微秒")
common_module.send_task_status(task_start_time, "NGV更新数据")
common_module.send_task_status(task_start_time, "NGV更新数据")
logger.info("NGV更新数据任务已完成。")
except Exception as e:
error_task_logger.error(f"NGV更新数据执行时发生异常: {e}")
common_module.send_task_error(task_start_time, "NGV更新数据", str(e))
@staticmethod
def row_to_dict(row, field_mapping):