# -*- coding: utf-8 -*- import pandas as pd import datetime import os import sys from concurrent.futures import ThreadPoolExecutor, as_completed import time import numpy as np # 添加父目录到Python路径,以便导入项目模块 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import Config from api import API from back_ground_module import CommonModule from log_config import configure_task_logger, configure_error_task_logger from tqdm import tqdm # 获取已经配置好的日志记录器 logger = configure_task_logger() error_task_logger = configure_error_task_logger() # 初始化实例 api_instance = API() common_module = CommonModule() # 输出目录配置 output_dir = "output" os.makedirs(output_dir, exist_ok=True) # ==================== 全局配置开关 ==================== # 【1. 本地缓存配置】 # 开发调试开关:设置为True时从本地缓存读取数据,False时从API获取 USE_LOCAL_CACHE = False # 【2. 删除状态重置配置】 # 首次运行开关:设置为True时会批量标记所有数据为"未删除"(仅首次运行需要) RESET_ALL_DELETED_STATUS = False # 首次运行设为True,之后设为False # 【3. 并发更新配置】 # 是否使用并发更新(多线程同时更新,速度快) USE_CONCURRENT_UPDATE = True # True=并发更新(快),False=串行更新(慢) # 并发线程数(同时执行的更新任务数) # 建议值:5-20,过大可能被API限流,过小影响速度 # 如果API限流严重,可以降低到3-5 CONCURRENT_WORKERS = 4 # 【4. 批量创建配置】 # 是否使用批量创建(批量创建速度快) USE_BATCH_CREATE = True # True=批量创建(快),False=逐条创建(慢) # 批量创建大小(每批次创建的记录数) BATCH_CREATE_SIZE = 90 # ==================================================== # 本地缓存文件路径 CACHE_DIR = os.path.join(output_dir, "cache") os.makedirs(CACHE_DIR, exist_ok=True) CACHE_FILES = { 'jdy_ngv_data': os.path.join(CACHE_DIR, 'jdy_ngv_data.csv'), 'staff_data': os.path.join(CACHE_DIR, 'staff_data.csv'), 'ngv_data_today': os.path.join(CACHE_DIR, 'ngv_data_today.csv'), 'ngv_data_yesterday': os.path.join(CACHE_DIR, 'ngv_data_yesterday.csv'), } class UpdateAllNGVDataDaily: """NGV数据每日更新""" # 门店编码字段的Widget ID ORG_CODE_WIDGET_ID = '_widget_1734062123071' def __init__(self): """初始化""" self.field_mapping = {} self._init_field_mapping() # org_code到所有_id的映射(用于更新所有重复记录) self.org_code_to_all_ids = {} def main(self): """主流程""" task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: logger.info("=" * 60) logger.info(f"开始执行NGV数据更新任务:{task_start_time}") logger.info("=" * 60) # 步骤1: 加载基础数据 jdy_ngv_data, staff_id_map = self._load_base_data() # 步骤2: 获取并处理NGV源数据 ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data(task_start_time) # 步骤3: 处理已删除的门店 self._handle_deleted_stores(jdy_ngv_data, ngv_data_today) # 步骤4: 对比数据变化 changed_data = self._compare_data_changes(ngv_data_today, ngv_data_yesterday, jdy_ngv_data) # 步骤5: 数据转换 prepared_data = self._prepare_data_for_sync(changed_data, staff_id_map) # 步骤6: 同步到简道云 self._sync_to_jiandaoyun(prepared_data) logger.info("=" * 60) logger.info("NGV更新数据任务已完成") common_module.send_task_status(task_start_time, "NGV更新数据") logger.info("=" * 60) except Exception as e: error_task_logger.error(f"NGV更新数据执行时发生异常: {e}", exc_info=True) common_module.send_task_error(task_start_time, "NGV更新数据", str(e)) raise def _compose_key_values(self, org_name, group_name, org_code, id_own_group, id_own_org): """将五个字段组合为稳定索引键,空值用空字符串,占位并去除首尾空格。""" def nv(x): return '' if pd.isna(x) else str(x).strip() parts = [nv(org_name), nv(group_name), nv(org_code), nv(id_own_group), nv(id_own_org)] return '||'.join(parts) def _compose_key_df_ngv(self, df): """为NGV数据增加composite_key列(基于字段名)。""" cols = ['org_name', 'group_name', 'org_code', 'id_own_group', 'id_own_org'] for c in cols: if c not in df.columns: df[c] = '' df['composite_key'] = [ self._compose_key_values(r['org_name'], r['group_name'], r['org_code'], r['id_own_group'], r['id_own_org']) for _, r in df.iterrows() ] return df def _compose_key_df_jdy(self, df): """为简道云数据增加composite_key列(基于widget列名)。""" # 对应字段widget id col_map = { '_widget_1734062123070': 'org_name', '_widget_1734062123068': 'group_name', '_widget_1734062123071': 'org_code', '_widget_1734062123067': 'id_own_group', '_widget_1734062123069': 'id_own_org', } tmp = df.copy() for wid in col_map.keys(): if wid not in tmp.columns: tmp[wid] = '' tmp_renamed = tmp.rename(columns=col_map) tmp_renamed['composite_key'] = [ self._compose_key_values(r.get('org_name', ''), r.get('group_name', ''), r.get('org_code', ''), r.get('id_own_group', ''), r.get('id_own_org', '')) for _, r in tmp_renamed.iterrows() ] return tmp_renamed def _load_base_data(self): """ 步骤1: 加载基础数据 返回: (简道云NGV数据, 员工ID映射字典) """ logger.info("步骤1: 开始加载基础数据...") # 检查是否使用本地缓存 if USE_LOCAL_CACHE: logger.info(" [缓存模式] 从本地缓存读取数据...") jdy_ngv_data = self._load_from_cache('jdy_ngv_data') staff_df = self._load_from_cache('staff_data') # 从缓存的staff_data重建映射字典 staff_id_map = {} if not staff_df.empty and '_widget_1734942794144' in staff_df.columns and '_widget_1734942794145' in staff_df.columns: staff_id_map = dict(zip( staff_df['_widget_1734942794144'].astype(str), staff_df['_widget_1734942794145'] )) logger.info(f" - [缓存] 简道云NGV数据: {len(jdy_ngv_data)} 条") logger.info(f" - [缓存] 员工ID映射: {len(staff_id_map)} 个员工") return jdy_ngv_data, staff_id_map # 从API获取数据 logger.info(" [API模式] 从简道云API获取数据...") # 获取简道云现有NGV数据 payload = { "api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4" } ngv_data_list = api_instance.entry_data_list(payload).get("data", []) jdy_ngv_data = pd.DataFrame(ngv_data_list) logger.info(f" - 已获取简道云NGV数据: {len(jdy_ngv_data)} 条") # 保存到本地缓存 self._save_to_cache(jdy_ngv_data, 'jdy_ngv_data') # 获取员工信息并构建映射字典 payload = { "api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "6769204a1902c9341340a1bc" } staff_data = api_instance.entry_data_list(payload) staff_list = staff_data.get("data", []) staff_df = pd.DataFrame(staff_list) # 保存到本地缓存 self._save_to_cache(staff_df, 'staff_data') # 构建员工姓名到ID的映射字典(优化查询性能) staff_id_map = { str(staff['_widget_1734942794144']): staff['_widget_1734942794145'] for staff in staff_list if '_widget_1734942794144' in staff and '_widget_1734942794145' in staff } logger.info(f" - 已构建员工ID映射字典: {len(staff_id_map)} 个员工") return jdy_ngv_data, staff_id_map def _load_ngv_source_data(self, task_start_time): """ 步骤2: 获取并处理NGV源数据 返回: (昨天的数据, 前天的数据) """ logger.info("步骤2: 开始获取NGV源数据...") # 检查是否使用本地缓存 if USE_LOCAL_CACHE: logger.info(" [缓存模式] 从本地缓存读取NGV源数据...") ngv_data_1 = self._load_from_cache('ngv_data_today') ngv_data_2 = self._load_from_cache('ngv_data_yesterday') logger.info(f" - [缓存] 昨天数据: {len(ngv_data_1)} 条") logger.info(f" - [缓存] 前天数据: {len(ngv_data_2)} 条") return ngv_data_1, ngv_data_2 # 从API获取数据 logger.info(" [API模式] 从数据库获取NGV源数据...") # 获取最近两天的NGV数据 ngv_data_1 = common_module.get_ngv_details(days_back=1) ngv_data_2 = common_module.get_ngv_details(days_back=2) import time nowtime = time.time() # 存储每天获取到的数据 ngv_data_1.to_csv(f"ngv_data_today.csv", index=False) ngv_data_2.to_csv(f"ngv_data_yesterday.csv", index=False) # 只保留 org_type 为 "一般" 的记录 ngv_data_1 = ngv_data_1[ngv_data_1['org_type'] == '一般'] ngv_data_2 = ngv_data_2[ngv_data_2['org_type'] == '一般'] # 保存到本地缓存 self._save_to_cache(ngv_data_1, 'ngv_data_today') self._save_to_cache(ngv_data_2, 'ngv_data_yesterday') logger.info(f" - 昨天数据: {len(ngv_data_1)} 条") logger.info(f" - 前天数据: {len(ngv_data_2)} 条") return ngv_data_1, ngv_data_2 def _handle_deleted_stores(self, jdy_ngv_data, ngv_current_data): """ 步骤3: 处理已删除的门店 逻辑: 1. 【首次运行】批量标记所有数据为"未删除"(通过RESET_ALL_DELETED_STATUS开关控制) 2. 批量标记不存在的门店为"已删除" 重要:这个方法会修改简道云数据! """ logger.info("步骤3: 开始处理已删除的门店...") # 准备简道云数据 temp_jdy_data = jdy_ngv_data.copy() temp_jdy_data.reset_index(drop=True, inplace=True) if self.ORG_CODE_WIDGET_ID not in temp_jdy_data.columns: error_task_logger.error("列 '门店编码' 不存在") return # 添加org_code列 temp_jdy_data['org_code'] = temp_jdy_data[self.ORG_CODE_WIDGET_ID] # 检查是否有空的org_code null_org_codes = temp_jdy_data['org_code'].isna().sum() if null_org_codes > 0: logger.warning(f" ⚠ 警告: 简道云数据中有 {null_org_codes} 条记录的门店编码为空,将被忽略") temp_jdy_data = temp_jdy_data[temp_jdy_data['org_code'].notna()] # 检查并记录重复的org_code【数据质量问题】 duplicate_org_codes = temp_jdy_data[temp_jdy_data['org_code'].duplicated(keep=False)] if len(duplicate_org_codes) > 0: unique_duplicate_org_codes = duplicate_org_codes['org_code'].nunique() logger.warning( f" ⚠ 数据质量问题:发现 {unique_duplicate_org_codes} 个org_code重复,共 {len(duplicate_org_codes)} 条记录") logger.warning(f" ⚠ 建议:请在简道云中清理重复数据,每个org_code只保留一条记录") # 保存重复记录到文件供分析 if output_dir: duplicate_org_codes[['org_code', '_id']].to_csv( os.path.join(output_dir, '重复org_code记录.csv'), index=False, encoding='utf-8-sig' ) logger.warning(f" - 重复详情已保存到: 重复org_code记录.csv") # 【重要】标记删除时不去重,需要标记所有重复记录 logger.info(f" - 简道云原始数据共 {len(temp_jdy_data)} 条记录(包含重复)") logger.info(f" - NGV源数据: {len(ngv_current_data)} 条") # 检查_id列是否存在 if '_id' not in temp_jdy_data.columns: error_task_logger.error(" ✗ 错误: 简道云数据中没有_id列,无法标记删除状态") return # 3.1 【首次运行】批量标记所有数据为"未删除" if RESET_ALL_DELETED_STATUS: logger.info(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") logger.info(" 【首次运行模式】批量重置所有数据删除状态") logger.info(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") all_data_ids = temp_jdy_data['_id'].dropna().tolist() if all_data_ids: logger.info(f" - 准备批量标记所有数据为'未删除': {len(all_data_ids)} 条") batch_data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_ids": all_data_ids, "data": {"_widget_1754285499851": {"value": "未删除"}} } api_instance.entry_data_banch_update(data=batch_data, max_retries=20) logger.info(f" ✓ 已批量标记所有数据为'未删除': {len(all_data_ids)} 条") logger.info(f" 💡 提示:首次运行完成后,请将 RESET_ALL_DELETED_STATUS 设置为 False") else: logger.warning(" ⚠ 警告: 没有有效的_id可以标记") # 3.2 找出需要标记为"已删除"的门店 ngv_org_codes = set(ngv_current_data['org_code'].dropna().unique()) jdy_org_codes_unique = set(temp_jdy_data['org_code'].dropna().unique()) # 找出在简道云存在但NGV中不存在的门店(唯一复合索引) missing_org_codes = jdy_org_codes_unique - ngv_org_codes if len(missing_org_codes) == 0: logger.info(" - 无需处理的已删除门店(简道云和NGV数据完全一致)") return logger.info(f" - 发现 {len(missing_org_codes)} 个门店在简道云存在但NGV中不存在") # 【关键】筛选出所有匹配的记录(包括重复的) # 这样可以把同一个org_code的所有记录都标记为已删除 only_in_jdy = temp_jdy_data[temp_jdy_data['org_code'].isin(missing_org_codes)].copy() logger.info(f" - 需要标记删除的org_code数: {len(missing_org_codes)}") logger.info(f" - 需要标记删除的记录数: {len(only_in_jdy)} 条(包含重复记录)") # 3.3 批量标记为"已删除" delete_data_ids = only_in_jdy['_id'].dropna().tolist() if not delete_data_ids: logger.warning(" ⚠ 警告: 没有有效的_id可以标记为已删除") return logger.info(f" - 准备批量标记为'已删除': {len(delete_data_ids)} 条") batch_data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_ids": delete_data_ids, "data": {"_widget_1754285499851": {"value": "已删除"}} } api_instance.entry_data_banch_update(data=batch_data, max_retries=20) logger.info(f" ✓ 已批量标记为'已删除': {len(delete_data_ids)} 条") def _compare_data_changes(self, ngv_today, ngv_yesterday, jdy_ngv_data): """ 步骤4: 对比数据变化,找出需要更新的记录 返回: 带有_id的变化数据DataFrame """ logger.info("步骤4: 开始对比数据变化...") # 移除不需要对比的列 columns_to_remove = {'date_id', 'date_fmt', 'pt', 'etl_time','id_own_org'} # 过滤列 df1_filtered = ngv_today[[col for col in ngv_today.columns if col not in columns_to_remove]] df2_filtered = ngv_yesterday[[col for col in ngv_yesterday.columns if col not in columns_to_remove]] # 设置索引 df1_indexed = df1_filtered.set_index('org_code') df2_indexed = df2_filtered.set_index('org_code') # 数据清洗:统一处理空值 df1_indexed = df1_indexed.astype(str).replace(['nan', 'None'], '').fillna('') df2_indexed = df2_indexed.astype(str).replace(['nan', 'None'], '').fillna('') # 找到共同的索引和列 common_index = df1_indexed.index.intersection(df2_indexed.index) df1_common = df1_indexed.reindex(common_index).fillna('') df2_common = df2_indexed.reindex(common_index).fillna('') common_columns = df1_common.columns.intersection(df2_common.columns) df1_common = df1_common[common_columns] df2_common = df2_common[common_columns] # 对比数据 matches = (df1_common == df2_common).all(axis=1) df1_common['match_status'] = matches.map({True: '一致', False: '不一致'}) # 统计一致和不一致的数量 consistent_count = (df1_common['match_status'] == '一致').sum() inconsistent_count = (df1_common['match_status'] == '不一致').sum() logger.info(f" 📊 共同门店数据对比结果:") logger.info(f" - 共同门店总数: {len(df1_common)}") logger.info(f" - 数据一致: {consistent_count} 条") logger.info(f" - 数据不一致(需要更新): {inconsistent_count} 条") # 只保留不一致的数据 changed_data = df1_common[df1_common['match_status'] == '不一致'].copy() # 关联简道云的_id(基于org_code) temp_jdy = jdy_ngv_data.copy() temp_jdy.reset_index(drop=True, inplace=True) # 【新增】保存org_code到所有_id的映射(用于更新所有重复记录) org_code_to_all_ids = {} if self.ORG_CODE_WIDGET_ID in temp_jdy.columns: temp_jdy.rename(columns={self.ORG_CODE_WIDGET_ID: 'org_code'}, inplace=True) # 构建org_code到所有_id的映射 for _, row in temp_jdy.iterrows(): org_code = row['org_code'] data_id = row['_id'] if pd.notna(org_code) and pd.notna(data_id): if org_code not in org_code_to_all_ids: org_code_to_all_ids[org_code] = [] org_code_to_all_ids[org_code].append(data_id) # 统计重复情况 multi_id_count = sum(1 for ids in org_code_to_all_ids.values() if len(ids) > 1) if multi_id_count > 0: logger.info(f" - 发现 {multi_id_count} 个org_code有多个_id,更新时将同步所有记录") # 去重用于join(避免数据爆炸) if '_createTime' in temp_jdy.columns: temp_jdy = temp_jdy.sort_values('_createTime', ascending=False) temp_jdy = temp_jdy.drop_duplicates(subset=['org_code'], keep='first') temp_jdy.set_index('org_code', inplace=True) changed_data = changed_data.join(temp_jdy['_id'], how='left') # 保存映射关系,供同步时使用 self.org_code_to_all_ids = org_code_to_all_ids # 【查缺补漏】查找NGV昨天有但简道云没有的门店,需要补充新建 logger.info(f"") logger.info(f" 📊 查缺补漏分析(NGV有但简道云没有):") # 获取NGV昨天的所有org_code(唯一值) ngv_org_codes = set(df1_indexed.index.unique()) # 获取简道云的所有org_code(已去重的) jdy_org_codes = set(temp_jdy.index.unique()) if len(temp_jdy) > 0 else set() # 找出NGV有但简道云没有的门店(需要补充新建) missing_in_jdy = ngv_org_codes - jdy_org_codes logger.info(f" - NGV昨天的门店总数: {len(ngv_org_codes)}") logger.info(f" - 简道云的门店总数: {len(jdy_org_codes)}") logger.info(f" - NGV有但简道云没有(需补充): {len(missing_in_jdy)} 条") if len(missing_in_jdy) > 0: # 获取这些缺失门店的完整数据 missing_stores = df1_indexed.loc[list(missing_in_jdy)].copy() missing_stores['match_status'] = '查缺补漏-需新建' missing_stores['_id'] = None # 这些门店在简道云中没有_id # 【关键】检查是否与已有的changed_data重复 if len(changed_data) > 0: # 找出在changed_data中已存在的org_code existing_org_codes = set(changed_data.index) overlap_org_codes = missing_in_jdy & existing_org_codes if len(overlap_org_codes) > 0: logger.warning(f" ⚠ 警告:发现 {len(overlap_org_codes)} 个org_code同时在'不一致'和'查缺补漏'中") logger.warning(f" 这些记录可能是数据问题,已从查缺补漏中排除") # 从missing_stores中移除重复的 missing_stores = missing_stores[~missing_stores.index.isin(overlap_org_codes)] missing_in_jdy = missing_in_jdy - overlap_org_codes if len(missing_stores) > 0: logger.info(f" - 实际需要补充新建: {len(missing_stores)} 条") # 将缺失门店加入到需要处理的数据中 changed_data = pd.concat([changed_data, missing_stores]) else: logger.info(f" ✓ 排除重复后无需补充") else: logger.info(f" ✓ 无需补充,简道云数据完整") logger.info(f"") logger.info(f" 📊 汇总统计:") logger.info(f" - 需要处理的数据总数: {len(changed_data)} 条") logger.info(f" ├─ 数据不一致(需更新): {inconsistent_count} 条") logger.info(f" └─ 查缺补漏(需新建): {len(missing_in_jdy)} 条") return changed_data def _prepare_data_for_sync(self, data_df, staff_id_map): """ 步骤5: 数据转换(日期转UTC、人员字段转换) """ logger.info("步骤5: 开始数据转换...") if len(data_df) == 0: logger.info(" - 无需转换的数据") return data_df prepared_df = data_df.copy() # 5.1 日期字段转换为UTC格式 time_columns = ['saas_create_time', 'expiry_time', 'install_create_time', 'last_end_date', 'renew_date'] for col in time_columns: if col not in prepared_df.columns: continue # 转换为datetime类型 temp_series = pd.to_datetime(prepared_df[col], errors='coerce', utc=False) # 转换为UTC时区格式 prepared_df[col + '_date'] = ( temp_series .dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='NaT') .dt.tz_convert('UTC') .dt.strftime('%Y-%m-%dT%H:%M:%SZ') ) logger.info(" - 日期字段已转换为UTC格式") # 5.2 人员字段转换为员工ID(使用字典映射,优化性能) staff_columns = ['area_manager', 'service_impl_principal', 'service_salesmen', 'technician'] for col in staff_columns: if col not in prepared_df.columns: continue # 使用向量化操作替代三重循环 prepared_df[col + '_staff_id'] = prepared_df[col].astype(str).map( lambda x: staff_id_map.get(x, None) ) logger.info(" - 人员字段已转换为员工ID") # 5.3G转化率保留3位小数 prepared_df['g_month_percentage'] = (pd.to_numeric(prepared_df['g_month_percentage'], errors='coerce') .round(3) .apply(lambda x: f"{x:.3f}" if pd.notna(x) else '')) logger.info(" - G转化率已保留3位小数") return prepared_df def _sync_to_jiandaoyun(self, data_df): """ 步骤6: 同步数据到简道云 支持批量更新和批量创建,大幅提升速度 """ logger.info("步骤6: 开始同步数据到简道云...") if len(data_df) == 0: logger.info(" - 无需同步的数据") return # 分离更新和创建数据 update_data_list = [] create_data_list = [] logger.info(f" - 准备同步数据...") for idx, row in data_df.iterrows(): # 构建数据字典 data_dict = self._build_data_dict(row) # 判断是更新还是创建 has_id = '_id' in data_df.columns and pd.notna(row.get('_id')) if has_id: # 更新操作:移除门店编码字段(不允许修改) if self.ORG_CODE_WIDGET_ID in data_dict: del data_dict[self.ORG_CODE_WIDGET_ID] # 【关键】获取该org_code的所有_id(包括重复记录) org_code = idx all_data_ids = self.org_code_to_all_ids.get(org_code, [str(row['_id'])]) # 为每个_id都准备更新数据 for data_id in all_data_ids: update_data_list.append({ 'org_code': org_code, 'data_id': str(data_id), 'data_dict': data_dict.copy(), 'row_data': row # 保存原始数据用于输出 }) else: # 新增:仅创建内容非全空的记录 # 检查除了门店编码外,其他字段是否全为空 all_empty = True for k, v in data_dict.items(): if k != self.ORG_CODE_WIDGET_ID and v.get('value') not in (None, '', float('nan')): all_empty = False break if all_empty: logger.info(f" - 跳过内容全空的新建记录 org_code: {idx}") continue create_data_list.append({ 'org_code': idx, 'data_dict': data_dict, 'row_data': row # 保存原始数据用于输出 }) logger.info(f" - 需要更新: {len(update_data_list)} 条") logger.info(f" - 需要创建: {len(create_data_list)} 条") # 执行更新 update_count = 0 if len(update_data_list) > 0: if USE_CONCURRENT_UPDATE: update_count = self._concurrent_update(update_data_list) else: update_count = self._single_update(update_data_list) # 输出更新统计 self._save_update_stats(update_data_list) # 执行创建 create_count = 0 create_df = pd.DataFrame(create_data_list) create_df.to_csv(f"create_data.csv", index=False) # if len(create_data_list) > 0: # if USE_BATCH_CREATE: # create_count = self._batch_create(create_data_list) # else: # create_count = self._single_create(create_data_list) # # 输出新增数据 # self._save_create_data(create_data_list) logger.info(f" ✓ 同步完成: 更新 {update_count} 条, 创建 {create_count} 条") def _concurrent_update(self, update_data_list): """ 并发更新(多线程,速度快) 使用线程池并发执行更新操作,大幅提升速度。 预期速度提升:3-10倍(取决于CONCURRENT_WORKERS设置) """ logger.info(f" - 使用并发更新模式 (并发数: {CONCURRENT_WORKERS})") total_count = len(update_data_list) success_count = 0 failed_count = 0 start_time = time.time() # 定义单条更新函数 def update_single_record(item): try: update_data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, 'data_id': item['data_id'], 'data': item['data_dict'] } api_instance.entry_data_update(data=update_data, max_retries=20) return {'success': True, 'org_code': item['org_code']} except Exception as e: error_task_logger.error(f"更新失败 (org_code={item['org_code']}): {e}") return {'success': False, 'org_code': item['org_code'], 'error': str(e)} # 使用线程池并发执行 with ThreadPoolExecutor(max_workers=CONCURRENT_WORKERS) as executor: # 提交所有任务 future_to_item = {executor.submit(update_single_record, item): item for item in update_data_list} # 使用tqdm显示进度 with tqdm(total=total_count, desc="并发更新") as pbar: for future in as_completed(future_to_item): result = future.result() if result['success']: success_count += 1 else: failed_count += 1 pbar.update(1) elapsed_time = time.time() - start_time speed = total_count / elapsed_time if elapsed_time > 0 else 0 logger.info(f" - 更新完成: 成功 {success_count} 条, 失败 {failed_count} 条") logger.info(f" - 耗时: {elapsed_time:.1f}秒, 速度: {speed:.1f}条/秒") return success_count def _single_update(self, update_data_list): """逐条更新(串行模式,速度慢但易定位问题)""" logger.info(f" - 使用串行更新模式(逐条)") success_count = 0 failed_count = 0 total_count = len(update_data_list) start_time = time.time() for item in tqdm(update_data_list, desc="串行更新"): try: update_data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, 'data_id': item['data_id'], 'data': item['data_dict'] } api_instance.entry_data_update(data=update_data, max_retries=20) success_count += 1 except Exception as e: failed_count += 1 error_task_logger.error(f"更新失败 (org_code={item['org_code']}): {e}") elapsed_time = time.time() - start_time speed = total_count / elapsed_time if elapsed_time > 0 else 0 logger.info(f" - 更新完成: 成功 {success_count} 条, 失败 {failed_count} 条") logger.info(f" - 耗时: {elapsed_time:.1f}秒, 速度: {speed:.1f}条/秒") return success_count def _batch_create(self, create_data_list): """批量创建(速度快)""" logger.info(f" - 使用批量创建模式 (批次大小: {BATCH_CREATE_SIZE})") total_count = len(create_data_list) success_count = 0 # 分批处理 for i in range(0, total_count, BATCH_CREATE_SIZE): batch = create_data_list[i:i + BATCH_CREATE_SIZE] try: # 批量创建 batch_create_data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, 'data': [item['data_dict'] for item in batch] } result = api_instance.data_batch_create(data=batch_create_data, max_retries=20) success_count += len(batch) if (i + BATCH_CREATE_SIZE) % (BATCH_CREATE_SIZE * 5) == 0 or (i + BATCH_CREATE_SIZE) >= total_count: logger.info(f" - 已创建 {min(i + BATCH_CREATE_SIZE, total_count)}/{total_count} 条") except Exception as e: error_task_logger.error(f"批量创建失败 (批次 {i // BATCH_CREATE_SIZE + 1}): {e}") # 批量失败时,尝试逐条创建这一批 logger.warning(f" - 批量创建失败,尝试逐条创建该批次...") for item in batch: try: single_create_data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, 'data': item['data_dict'] } api_instance.data_batch_create(data=single_create_data, max_retries=20) success_count += 1 except Exception as e2: error_task_logger.error(f"逐条创建失败 (org_code={item['org_code']}): {e2}") return success_count def _single_create(self, create_data_list): """逐条创建(速度慢但易定位问题)""" logger.info(f" - 使用逐条创建模式") success_count = 0 for item in tqdm(create_data_list, desc="逐条创建"): try: create_data = { 'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, 'data': item['data_dict'], 'is_start_trigger': 'true', } api_instance.data_batch_create(data=create_data, max_retries=20) success_count += 1 except Exception as e: error_task_logger.error(f"创建失败 (org_code={item['org_code']}): {e}") return success_count def _build_data_dict(self, row): """ 根据field_mapping构建简道云数据字典 """ data_dict = {} for col_name, widget_id in self.field_mapping.items(): if col_name in row.index: value = row[col_name] clean_value = None if pd.isna(value) else value data_dict[widget_id] = {"value": clean_value} return data_dict def _init_field_mapping(self): """初始化字段映射关系""" self.field_mapping = { # 基础信息 'date_id': '_widget_1734062123065', 'date_fmt': '_widget_1734062123066', 'id_own_group': '_widget_1734062123067', 'group_name': '_widget_1734062123068', 'id_own_org': '_widget_1734062123069', 'org_name': '_widget_1734062123070', 'org_code': '_widget_1734062123071', 'group_grade': '_widget_1734062123072', 'org_type': '_widget_1734062123073', 'org_status': '_widget_1734062123074', # SaaS相关 'saas_version': '_widget_1734062123075', 'is_wechat': '_widget_1734062123076', 'is_mini_app': '_widget_1734062123077', 'is_wx_shop': '_widget_1734062123078', 'is_camera_service': '_widget_1734062123079', 'is_maintenance_service': '_widget_1734062123080', 'saas_create_time': '_widget_1734062123081', 'expiry_time': '_widget_1734062123082', 'saas_use_days': '_widget_1734062123083', 'saas_use_year': '_widget_1734062123084', 'is_main_org': '_widget_1734062123085', # 证照信息 'license_code': '_widget_1734062123086', 'license_name': '_widget_1734062123087', 'org_crm_id': '_widget_1734062123088', # 地理信息 'province_id': '_widget_1734062123089', 'province_name': '_widget_1734062123090', 'city_id': '_widget_1734062123091', 'city_name': '_widget_1734062123092', 'area_id': '_widget_1734062123093', 'area_name': '_widget_1734062123094', 'region_name': '_widget_1734062123095', 'region_short_name': '_widget_1734062123096', 'branch_name': '_widget_1734062123097', # 车主邦相关 'carzone_store_id': '_widget_1734062123098', 'carzone_store_name': '_widget_1734062123099', 'customer_carzone_id': '_widget_1734062123100', # 人员信息 'salesmen': '_widget_1734062123101', 'area_manager': '_widget_1734062123102', 'service_salesmen': '_widget_1734062123103', 'impl_principal': '_widget_1734062123104', 'service_impl_principal': '_widget_1734062123105', # 用户统计 'active_user_count': '_widget_1734062123106', 'active_user_type': '_widget_1734062123107', 'limit_user_count': '_widget_1734062123108', 'limit_user_type': '_widget_1734062123109', # NGV标记 'is_n': '_widget_1734062123110', 'is_g': '_widget_1734062123111', 'is_v': '_widget_1734062123112', 'is_visited': '_widget_1734062123113', 'is_active': '_widget_1734062123114', 'active_status_fmt': '_widget_1734062123115', # 单据统计 'bill_count_last_30_day': '_widget_1734062123116', 'bill_day_count_last_30_day': '_widget_1734062123117', 'bill_day_count_this_month': '_widget_1734062123118', 'bill_count_last_7_day': '_widget_1734062123119', 'bill_day_count_last_7_day': '_widget_1734062123120', 'pv_count': '_widget_1734062123121', 'uv_count': '_widget_1734062123122', # 每日单据数(1-31天) 'bill_count_1d': '_widget_1734062123123', 'bill_count_2d': '_widget_1734062123124', 'bill_count_3d': '_widget_1734062123125', 'bill_count_4d': '_widget_1734062123126', 'bill_count_5d': '_widget_1734062123127', 'bill_count_6d': '_widget_1734062123128', 'bill_count_7d': '_widget_1734062123129', 'bill_count_8d': '_widget_1734062123130', 'bill_count_9d': '_widget_1734062123131', 'bill_count_10d': '_widget_1734062123132', 'bill_count_11d': '_widget_1734062123133', 'bill_count_12d': '_widget_1734062123134', 'bill_count_13d': '_widget_1734062123135', 'bill_count_14d': '_widget_1734062123136', 'bill_count_15d': '_widget_1734062123137', 'bill_count_16d': '_widget_1734062123138', 'bill_count_17d': '_widget_1734062123139', 'bill_count_18d': '_widget_1734062123140', 'bill_count_19d': '_widget_1734062123141', 'bill_count_20d': '_widget_1734062123142', 'bill_count_21d': '_widget_1734062123143', 'bill_count_22d': '_widget_1734062123144', 'bill_count_23d': '_widget_1734062123145', 'bill_count_24d': '_widget_1734062123146', 'bill_count_25d': '_widget_1734062123147', 'bill_count_26d': '_widget_1734062123148', 'bill_count_27d': '_widget_1734062123149', 'bill_count_28d': '_widget_1734062123150', 'bill_count_29d': '_widget_1734062123151', 'bill_count_30d': '_widget_1734062123152', 'bill_count_31d': '_widget_1734062123153', # ETL时间 'etl_time': '_widget_1734062123154', # 业务类型统计 'maintain_bill_count_last_30_day': '_widget_1734062123155', 'washing_bill_count_last_30_day': '_widget_1734062123156', 'maintain_bill_day_count_last_30_day': '_widget_1734062123157', 'washing_bill_day_count_last_30_day': '_widget_1734062123158', 'retail_bill_count_last_30_day': '_widget_1734062123159', 'retail_bill_day_count_last_30_day': '_widget_1734062123160', 'purchase_bill_count_last_30_day': '_widget_1734062123161', 'purchase_bill_day_count_last_30_day': '_widget_1734062123162', 'card_bill_count_last_30_day': '_widget_1734062123163', 'card_bill_day_count_last_30_day': '_widget_1734062123164', 'gd_sales_bill_count_last_30_day': '_widget_1734062123165', 'gd_sales_bill_day_count_last_30_day': '_widget_1734062123166', # G标记相关 'g_change_flag': '_widget_1734062123167', 'saas_package': '_widget_1734062123168', 'manage_model': '_widget_1734062123169', # 联系信息 'contacts': '_widget_1734062123170', 'contact_number': '_widget_1734062123171', 'contact_mobile': '_widget_1734062123172', # G月度统计 'g_month_count': '_widget_1734062123173', 'g_month_percentage': '_widget_1734062123174', # 安装服务 'is_install_service': '_widget_1734062123175', 'install_create_time': '_widget_1734062123176', 'last_end_date': '_widget_1734062123177', 'renew_date': '_widget_1734062123178', # 连锁信息 'is_chain_owner': '_widget_1734062123179', 'group_org_count': '_widget_1734062123180', # 预警信息 'recent_bill_warning_days': '_widget_1734062123181', 'g_change_flag_d': '_widget_1734062123182', 'g_lost_warning_days': '_widget_1734062123183', # SaaS版本 'saas_edition_fmt': '_widget_1734062123184', # G标记月度(1-6月) 'g_flag_1m': '_widget_1734062123185', 'g_flag_2m': '_widget_1734062123186', 'g_flag_3m': '_widget_1734062123187', 'g_flag_4m': '_widget_1734062123188', 'g_flag_5m': '_widget_1734062123189', 'g_flag_6m': '_widget_1734062123190', 'g_flag_day_count': '_widget_1734062123191', # 其他标记 'add_org_flag': '_widget_1734062123192', 'pt': '_widget_1734062123193', # 门店属性 'org_size': '_widget_1734062123194', 'qualification_type_fmt': '_widget_1734062123195', 'business_scope_fmt': '_widget_1734062123196', 'store_type_fmt': '_widget_1734062123197', 'area': '_widget_1734062123198', 'station_number': '_widget_1734062123199', 'header_type_fmt': '_widget_1734062123200', 'org_stage': '_widget_1734062123201', # 本月统计 'g_count_this_month': '_widget_1734062123202', 'saas_customer_type': '_widget_1734062123203', 'technician': '_widget_1734062123204', 'tmall_maintain_service_status_desc': '_widget_1734062123205', # 日期字段(UTC格式) 'date_fmt_date': '_widget_1749000071375', 'saas_create_time_date': '_widget_1749000071377', 'expiry_time_date': '_widget_1749000071382', 'install_create_time_date': '_widget_1749000071384', 'last_end_date_date': '_widget_1749000071389', 'renew_date_date': '_widget_1749000071391', # 人员ID字段 'area_manager_staff_id': '_widget_1748496855779', 'service_salesmen_staff_id': '_widget_1748496855778', 'service_impl_principal_staff_id': '_widget_1748496855780', 'technician_staff_id': '_widget_1751877712235', } def _save_to_cache(self, df, cache_key): """ 保存DataFrame到本地缓存 参数: df: 要保存的DataFrame cache_key: 缓存键名(对应CACHE_FILES中的键) """ if cache_key not in CACHE_FILES: logger.warning(f" ⚠ 未知的缓存键: {cache_key}") return cache_file = CACHE_FILES[cache_key] try: df.to_csv(cache_file, index=False, encoding='utf-8-sig') logger.info(f" ✓ 已保存到缓存: {cache_file} ({len(df)} 条记录)") except Exception as e: logger.warning(f" ⚠ 保存缓存失败 ({cache_key}): {e}") def _load_from_cache(self, cache_key): """ 从本地缓存加载DataFrame 参数: cache_key: 缓存键名(对应CACHE_FILES中的键) 返回: DataFrame或空DataFrame(如果缓存不存在) """ if cache_key not in CACHE_FILES: logger.warning(f" ⚠ 未知的缓存键: {cache_key}") return pd.DataFrame() cache_file = CACHE_FILES[cache_key] if not os.path.exists(cache_file): logger.warning(f" ⚠ 缓存文件不存在: {cache_file}") logger.warning(f" 请先关闭USE_LOCAL_CACHE运行一次以生成缓存文件") return pd.DataFrame() try: df = pd.read_csv(cache_file, encoding='utf-8-sig') logger.info(f" ✓ 已从缓存加载: {cache_file} ({len(df)} 条记录)") return df except Exception as e: logger.error(f" ✗ 加载缓存失败 ({cache_key}): {e}") return pd.DataFrame() def _save_create_data(self, create_data_list): """ 保存新增数据到文件 参数: create_data_list: 新增数据列表 """ if len(create_data_list) == 0: return try: # 生成时间戳 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 提取数据到DataFrame create_records = [] for item in create_data_list: row_data = item['row_data'] record = { 'org_code': item['org_code'], 'org_name': row_data.get('org_name', ''), 'group_name': row_data.get('group_name', ''), 'org_type': row_data.get('org_type', ''), 'province_name': row_data.get('province_name', ''), 'city_name': row_data.get('city_name', ''), 'saas_version': row_data.get('saas_version', ''), 'active_status_fmt': row_data.get('active_status_fmt', ''), } create_records.append(record) create_df = pd.DataFrame(create_records) # 使用相对路径保存(支持跨平台) file_path = os.path.join(output_dir, f'新增门店_{timestamp}.csv') create_df.to_csv(file_path, index=False, encoding='utf-8-sig') logger.info(f" ✓ 新增数据已保存: {file_path} ({len(create_df)} 条)") except Exception as e: error_task_logger.error(f"保存新增数据失败: {e}", exc_info=True) def _save_update_stats(self, update_data_list): """ 保存更新统计信息到文件 参数: update_data_list: 更新数据列表 """ if len(update_data_list) == 0: return try: # 生成时间戳 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # 统计每个org_code的更新记录数(去重) org_code_counts = {} org_code_info = {} for item in update_data_list: org_code = item['org_code'] if org_code not in org_code_counts: org_code_counts[org_code] = 0 row_data = item['row_data'] org_code_info[org_code] = { 'org_name': row_data.get('org_name', ''), 'group_name': row_data.get('group_name', ''), 'org_type': row_data.get('org_type', ''), 'province_name': row_data.get('province_name', ''), 'city_name': row_data.get('city_name', ''), 'saas_version': row_data.get('saas_version', ''), 'active_status_fmt': row_data.get('active_status_fmt', ''), } org_code_counts[org_code] += 1 # 构建统计DataFrame update_stats = [] for org_code, count in org_code_counts.items(): info = org_code_info[org_code] stat = { 'org_code': org_code, 'org_name': info['org_name'], 'group_name': info['group_name'], 'org_type': info['org_type'], 'province_name': info['province_name'], 'city_name': info['city_name'], 'saas_version': info['saas_version'], 'active_status_fmt': info['active_status_fmt'], 'update_count': count, 'note': '同一org_code有多个记录' if count > 1 else '' } update_stats.append(stat) update_df = pd.DataFrame(update_stats) update_df = update_df.sort_values('update_count', ascending=False) # 使用相对路径保存(支持跨平台) file_path = os.path.join(output_dir, f'更新统计_{timestamp}.csv') update_df.to_csv(file_path, index=False, encoding='utf-8-sig') # 统计汇总 total_org_codes = len(org_code_counts) total_records = len(update_data_list) duplicate_org_codes = sum(1 for count in org_code_counts.values() if count > 1) logger.info(f" ✓ 更新统计已保存: {file_path}") logger.info(f" - 更新的org_code数: {total_org_codes}") logger.info(f" - 更新的记录总数: {total_records}") if duplicate_org_codes > 0: logger.info(f" - 包含多条记录的org_code: {duplicate_org_codes}") except Exception as e: error_task_logger.error(f"保存更新统计失败: {e}", exc_info=True) if __name__ == '__main__': updater = UpdateAllNGVDataDaily() updater.main()