# -*- coding: utf-8 -*- import pandas as pd import datetime from config import Config from api import API from back_ground_module import CommonModule from log_config import configure_task_logger, configure_error_task_logger import concurrent.futures # 获取已经配置好的常规日志记录器 logger = configure_task_logger() # 获取已经配置好的错误任务日志记录器 error_task_logger = configure_error_task_logger() start_time = datetime.datetime.now() api_instance = API() common_module = CommonModule() # 保存为CSV文件 output_dir = "output" # 设置输出目录 # 创建输出目录(如果不存在) import os os.makedirs(output_dir, exist_ok=True) class UpdateAllNGVDataDaily: """NGV数据每日更新""" def __init__(self): self.field_mapping = {} self.fields() def main(self): task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: logger.info("开始执行任务:{}".format(task_start_time)) # 获取NGV数据 payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"} NGV_data_list = api_instance.entry_data_list(payload).get("data", []) jdy_NGV_data = pd.DataFrame(NGV_data_list) payload = {"api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "6769204a1902c9341340a1bc", } staff_id = api_instance.entry_data_list(payload) staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里 logger.info("已获取数据") # for i in range(1,2): data_NGV_j = common_module.get_ngv_details(days_back=1) data_NGV_j.to_csv(os.path.join(output_dir, f"data_NGV_j.csv"), index=False) data_NGV_j1 = common_module.get_ngv_details(days_back=2) # 对 data_NGV 进行进一步的过滤,只保留 org_type 为 "一般" 的记录 data_NGV_j = data_NGV_j[data_NGV_j['org_type'] == '一般'] data_NGV_j1 = data_NGV_j1[data_NGV_j1['org_type'] == '一般'] # 去除不需要的列 columns_to_remove = {'date_id', 'date_fmt', 'pt', 'etl_time'} # 获取所有列名并计算要保留的列 columns_to_keep_df1 = list(set(data_NGV_j.columns) - columns_to_remove) columns_to_keep_df2 = list(set(data_NGV_j1.columns) - columns_to_remove) # 过滤DataFrame以去除指定列 df1_filtered = data_NGV_j[columns_to_keep_df1] df2_filtered = data_NGV_j1[columns_to_keep_df2] # 设置唯一标识列作为索引 df1_set_index = df1_filtered.set_index('id_own_org') df2_set_index = df2_filtered.set_index('id_own_org') df1_set_index = df1_set_index.astype(str).replace(['nan', 'None'], '', ).fillna("") df2_set_index = df2_set_index.astype(str).replace(['nan', 'None'], '', ).fillna("") # 找到两个DataFrame共有的索引 common_index = df1_set_index.index.intersection(df2_set_index.index) # 使用共同的索引来重新索引两个DataFrame df1_common = df1_set_index.reindex(common_index).fillna('') df2_common = df2_set_index.reindex(common_index).fillna('') # 确保两个DataFrame有相同的列顺序 common_columns = df1_common.columns.intersection(df2_common.columns) df1_common = df1_common[common_columns] df2_common = df2_common[common_columns] # 比较两个DataFrame的内容 comparison_column = 'match_status' # 创建一个布尔Series,指示每一行是否完全相同 matches = (df1_common == df2_common).all(axis=1) # 添加新列到第一个DataFrame,标记是否匹配 df1_common[comparison_column] = matches.map({True: '一致', False: '不一致'}) # df1_common.to_csv(os.path.join(output_dir, f"df1_common.csv")) # 如果需要也可以添加到第二个DataFrame(这里假设只需要处理df1_common) # df2_common[comparison_column] = matches.map({True: '一致', False: '不一致'}) # 提取只在一个DataFrame中存在的索引对应的行 df1_only_index = df1_set_index.index.difference(df2_set_index.index) df2_only_index = df2_set_index.index.difference(df1_set_index.index) df1_only_rows = df1_set_index.loc[df1_only_index].copy() df2_only_rows = df2_set_index.loc[df2_only_index].copy() # 保存匹配结果 # df1_common.to_csv(os.path.join(output_dir, 'matched_results.csv'), index_label='id_own_org') # 保存仅在df1中的行 # df1_only_rows.to_csv(os.path.join(output_dir, 'df1_only_rows.csv'), index_label='id_own_org') # 保存仅在df2中的行 # df2_only_rows.to_csv(os.path.join(output_dir, 'df2_only_rows.csv'), index_label='id_own_org') # data_NGV_j.to_csv(os.path.join(output_dir, 'data_NGV_j.csv'), index_label='id_own_org') # data_NGV_j1.to_csv(os.path.join(output_dir, 'data_NGV_j1.csv'), index_label='id_own_org') # jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data.csv'), index_label='id_own_org') # print(f"\nCSV文件已保存到目录: {output_dir}") temp_jdy_NGV_data = jdy_NGV_data.copy() # temp_jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data.csv'), index=False) temp_jdy_NGV_data.reset_index(inplace=True) # 如果 '门店id' 是索引,则先将其转换为普通列 # temp_jdy_NGV_data.to_csv(os.path.join(output_dir, 'jdy_NGV_data1.csv'), index=False) if '_widget_1734062123069' not in temp_jdy_NGV_data.columns: error_task_logger.error("列 '门店id' 不存在") temp_jdy_NGV_data.rename(columns={'_widget_1734062123069': 'id_own_org'}, inplace=True) temp_jdy_NGV_data.set_index('id_own_org', inplace=True) # 如果简道云存在,NGV不存在则标记NGV已删除 # 找出在 temp_jdy_NGV_data 中存在,但在 df1_common 中不存在的索引 ids_in_jdy_not_in_df1 = temp_jdy_NGV_data.index[~temp_jdy_NGV_data.index.isin(df1_common.index)] # 提取这些行,形成新的 DataFrame only_in_temp_jdy = temp_jdy_NGV_data.loc[ids_in_jdy_not_in_df1] # 对数据源已经去掉的门店进行标记 for index, only_row in only_in_temp_jdy.iterrows(): result = {} if '_id' in only_in_temp_jdy.columns: _id_value = str(only_row['_id']) if not pd.isna(only_row['_id']) else None result["_id"] = _id_value if result["_id"]: data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_id": result["_id"], "data": {"_widget_1754285499851": {"value": "已删除"}} } api_instance.entry_data_update(data=data, max_retries=20) # 简道云与ngv不一致的数据做关联 df1_common = df1_common.join(temp_jdy_NGV_data["_id"], how='left') df1_common = df1_common[df1_common['match_status'] == '不一致'] # 日期字段转换为日期格式 time_columns = ['saas_create_time', 'expiry_time', 'install_create_time', "last_end_date", "renew_date"] new_filtered_df = df1_common.copy() # 复制df,以调整时间 for col in time_columns: # 1. 转换为datetime类型(带错误处理) # 使用.loc安全赋值 new_filtered_df[col] = pd.to_datetime(df1_common[col], errors='coerce', utc=False) # 2. 优化后的时区转换(高效向量化操作) df1_common[col + '_date'] = ( new_filtered_df[col] # 本地化为北京时间(东八区) .dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='NaT') # 转换为UTC时区 .dt.tz_convert('UTC') # 格式化为ISO8601字符串 .dt.strftime('%Y-%m-%dT%H:%M:%SZ') ) logger.info("日期已转换为UTC格式") # 人员字段转换为人员字段 staff_columns = ['area_manager', 'service_impl_principal', "service_salesmen", "technician"] # 将员工列表转为DataFrame # 三重循环临时方案(确保可写入) for col in staff_columns: staff_ids = [] for _, row in df1_common.iterrows(): matched = False for staff in staff_id_list: if str(staff['_widget_1734942794144']) == str(row[col]): staff_ids.append(staff['_widget_1734942794145']) matched = True break if not matched: staff_ids.append(None) df1_common[col + "_staff_id"] = staff_ids logger.info("人员字段已替换") # 并发请求 futures = [] all_data = [] logger.info(f"今日更新数据量为:{len(df1_common)}条") for idx, row in df1_common.iterrows(): result = {} data_dict = {} # 根据 field_mapping 进行字段替换 for col_name, widget_id in self.field_mapping.items(): if col_name in df1_common.columns: value = row[col_name] clean_value = None if pd.isna(value) else value data_dict[widget_id] = {"value": clean_value} # 单独处理 _id 列,并将其转换为字符串 if '_id' in df1_common.columns: _id_value = str(row['_id']) if not pd.isna(row['_id']) else None result["_id"] = _id_value # 组装最终结果 if result["_id"]: data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_id": result["_id"], "data": data_dict } api_instance.entry_data_update(data=data, max_retries=20) else: # continue data1 = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data": data_dict} res = api_instance.data_batch_create(data=data1, max_retries=20) logger.info(f"补派数据:{res}") # all_data.append(data_dict) # 收集所有结果 for future in concurrent.futures.as_completed(futures): try: result = future.result() logger.info(f"所有请求结果:{result}") except Exception as exc: error_task_logger.error(f"请求发生异常: {exc}") common_module.send_task_status(task_start_time, "NGV更新数据") logger.info("NGV更新数据任务已完成。") except Exception as e: error_task_logger.error(f"NGV更新数据执行时发生异常: {e}") common_module.send_task_error(task_start_time, "NGV更新数据", str(e)) @staticmethod def row_to_dict(row, field_mapping): """将一行数据转换为指定格式的字典""" result = {} for col_name, widget_id in field_mapping.items(): if col_name in row: value = row[col_name] clean_value = None if pd.isna(value) else value result[widget_id] = {"value": clean_value} return result def fields(self): self.field_mapping = dict(date_id='_widget_1734062123065', date_fmt='_widget_1734062123066', id_own_group='_widget_1734062123067', group_name='_widget_1734062123068', id_own_org='_widget_1734062123069', org_name='_widget_1734062123070', org_code='_widget_1734062123071', group_grade='_widget_1734062123072', org_type='_widget_1734062123073', org_status='_widget_1734062123074', saas_version='_widget_1734062123075', is_wechat='_widget_1734062123076', is_mini_app='_widget_1734062123077', is_wx_shop='_widget_1734062123078', is_camera_service='_widget_1734062123079', is_maintenance_service='_widget_1734062123080', saas_create_time='_widget_1734062123081', expiry_time='_widget_1734062123082', saas_use_days='_widget_1734062123083', saas_use_year='_widget_1734062123084', is_main_org='_widget_1734062123085', license_code='_widget_1734062123086', license_name='_widget_1734062123087', org_crm_id='_widget_1734062123088', province_id='_widget_1734062123089', province_name='_widget_1734062123090', city_id='_widget_1734062123091', city_name='_widget_1734062123092', area_id='_widget_1734062123093', area_name='_widget_1734062123094', region_name='_widget_1734062123095', region_short_name='_widget_1734062123096', branch_name='_widget_1734062123097', carzone_store_id='_widget_1734062123098', carzone_store_name='_widget_1734062123099', customer_carzone_id='_widget_1734062123100', salesmen='_widget_1734062123101', area_manager='_widget_1734062123102', service_salesmen='_widget_1734062123103', impl_principal='_widget_1734062123104', service_impl_principal='_widget_1734062123105', active_user_count='_widget_1734062123106', active_user_type='_widget_1734062123107', limit_user_count='_widget_1734062123108', limit_user_type='_widget_1734062123109', is_n='_widget_1734062123110', is_g='_widget_1734062123111', is_v='_widget_1734062123112', is_visited='_widget_1734062123113', is_active='_widget_1734062123114', active_status_fmt='_widget_1734062123115', bill_count_last_30_day='_widget_1734062123116', bill_day_count_last_30_day='_widget_1734062123117', bill_day_count_this_month='_widget_1734062123118', bill_count_last_7_day='_widget_1734062123119', bill_day_count_last_7_day='_widget_1734062123120', pv_count='_widget_1734062123121', uv_count='_widget_1734062123122', bill_count_1d='_widget_1734062123123', bill_count_2d='_widget_1734062123124', bill_count_3d='_widget_1734062123125', bill_count_4d='_widget_1734062123126', bill_count_5d='_widget_1734062123127', bill_count_6d='_widget_1734062123128', bill_count_7d='_widget_1734062123129', bill_count_8d='_widget_1734062123130', bill_count_9d='_widget_1734062123131', bill_count_10d='_widget_1734062123132', bill_count_11d='_widget_1734062123133', bill_count_12d='_widget_1734062123134', bill_count_13d='_widget_1734062123135', bill_count_14d='_widget_1734062123136', bill_count_15d='_widget_1734062123137', bill_count_16d='_widget_1734062123138', bill_count_17d='_widget_1734062123139', bill_count_18d='_widget_1734062123140', bill_count_19d='_widget_1734062123141', bill_count_20d='_widget_1734062123142', bill_count_21d='_widget_1734062123143', bill_count_22d='_widget_1734062123144', bill_count_23d='_widget_1734062123145', bill_count_24d='_widget_1734062123146', bill_count_25d='_widget_1734062123147', bill_count_26d='_widget_1734062123148', bill_count_27d='_widget_1734062123149', bill_count_28d='_widget_1734062123150', bill_count_29d='_widget_1734062123151', bill_count_30d='_widget_1734062123152', bill_count_31d='_widget_1734062123153', etl_time='_widget_1734062123154', maintain_bill_count_last_30_day='_widget_1734062123155', washing_bill_count_last_30_day='_widget_1734062123156', maintain_bill_day_count_last_30_day='_widget_1734062123157', washing_bill_day_count_last_30_day='_widget_1734062123158', retail_bill_count_last_30_day='_widget_1734062123159', retail_bill_day_count_last_30_day='_widget_1734062123160', purchase_bill_count_last_30_day='_widget_1734062123161', purchase_bill_day_count_last_30_day='_widget_1734062123162', card_bill_count_last_30_day='_widget_1734062123163', card_bill_day_count_last_30_day='_widget_1734062123164', gd_sales_bill_count_last_30_day='_widget_1734062123165', gd_sales_bill_day_count_last_30_day='_widget_1734062123166', g_change_flag='_widget_1734062123167', saas_package='_widget_1734062123168', manage_model='_widget_1734062123169', contacts='_widget_1734062123170', contact_number='_widget_1734062123171', contact_mobile='_widget_1734062123172', g_month_count='_widget_1734062123173', g_month_percentage='_widget_1734062123174', is_install_service='_widget_1734062123175', install_create_time='_widget_1734062123176', last_end_date='_widget_1734062123177', renew_date='_widget_1734062123178', is_chain_owner='_widget_1734062123179', group_org_count='_widget_1734062123180', recent_bill_warning_days='_widget_1734062123181', g_change_flag_d='_widget_1734062123182', g_lost_warning_days='_widget_1734062123183', saas_edition_fmt='_widget_1734062123184', g_flag_1m='_widget_1734062123185', g_flag_2m='_widget_1734062123186', g_flag_3m='_widget_1734062123187', g_flag_4m='_widget_1734062123188', g_flag_5m='_widget_1734062123189', g_flag_6m='_widget_1734062123190', g_flag_day_count='_widget_1734062123191', add_org_flag='_widget_1734062123192', pt='_widget_1734062123193', org_size='_widget_1734062123194', qualification_type_fmt='_widget_1734062123195', business_scope_fmt='_widget_1734062123196', store_type_fmt='_widget_1734062123197', area='_widget_1734062123198', station_number='_widget_1734062123199', header_type_fmt='_widget_1734062123200', org_stage='_widget_1734062123201', g_count_this_month='_widget_1734062123202', saas_customer_type='_widget_1734062123203', technician='_widget_1734062123204', tmall_maintain_service_status_desc='_widget_1734062123205', date_fmt_date='_widget_1749000071375', area_manager_staff_id='_widget_1748496855779', service_impl_principal_staff_id="_widget_1748496855780", service_salesmen_staff_id="_widget_1748496855778", technician_staff_id="_widget_1751877712235", saas_create_time_date="_widget_1749000071377", expiry_time_date="_widget_1749000071382", install_create_time_date="_widget_1749000071384", last_end_date_date="_widget_1749000071389", renew_date_date="_widget_1749000071391") if __name__ == '__main__': start = UpdateAllNGVDataDaily() start.main()