Files
saas/back_ground_module/update_all_NGV_data_daily.py
T

1060 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
NGV数据每日更新 - 优化版本
优化点:
1. 保留批量标记未删除再标记已删除的逻辑
2. 重构代码结构,提高可读性和可维护性
3. 解决无id时创建记录没有门店编码的问题
4. 优化人员字段匹配效率(使用字典映射)
5. 移除无效的并发代码
6. 支持本地缓存功能,方便开发调试
【本地缓存使用说明】
调试时可以使用本地缓存功能,避免每次都重新获取数据:
1. 首次运行(生成缓存):
- 保持 USE_LOCAL_CACHE = False
- 运行脚本,会自动将数据保存到 output/cache/ 目录
2. 调试批量修改(使用缓存):
- 修改代码:USE_LOCAL_CACHE = True
- 重新运行,将从本地缓存读取数据,跳过API调用
- 可以快速测试批量修改逻辑
3. 正式运行(禁用缓存):
- 修改代码:USE_LOCAL_CACHE = False
- 获取最新数据并执行更新
缓存文件位置:output/cache/
- jdy_ngv_data.csv: 简道云NGV数据
- staff_data.csv: 员工数据
- ngv_data_today.csv: NGV昨天数据
- ngv_data_yesterday.csv: NGV前天数据
【删除状态重置说明】
首次运行时需要重置所有数据的删除状态:
1. 首次运行:
- 设置 RESET_ALL_DELETED_STATUS = True
- 运行脚本,会批量标记所有简道云数据为"未删除"
- 然后批量标记不存在的门店为"已删除"
2. 日常运行:
- 设置 RESET_ALL_DELETED_STATUS = False
- 只处理新增的已删除门店(批量标记)
【并发更新说明】
通过多线程并发提升更新速度:
1. 并发模式(推荐,速度快):
- 设置 USE_CONCURRENT_UPDATE = True
- 设置 CONCURRENT_WORKERS = 10(并发线程数)
- 预期速度:10条/秒或更快(取决于网络和API性能)
2. 串行模式(调试用):
- 设置 USE_CONCURRENT_UPDATE = False
- 逐条更新,速度慢(约3条/秒)
- 便于定位问题
3. 速度调优:
- 提高 CONCURRENT_WORKERS 可以提速(建议5-20
- 过高可能导致API限流,需要根据实际情况调整
"""
import pandas as pd
import datetime
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# 添加父目录到Python路径,以便导入项目模块
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import Config
from api import API
from back_ground_module import CommonModule
from log_config import configure_task_logger, configure_error_task_logger
from tqdm import tqdm
# 获取已经配置好的日志记录器
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
# 初始化实例
api_instance = API()
common_module = CommonModule()
# 输出目录配置
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
# ==================== 全局配置开关 ====================
# 【1. 本地缓存配置】
# 开发调试开关:设置为True时从本地缓存读取数据,False时从API获取
USE_LOCAL_CACHE = False
# 【2. 删除状态重置配置】
# 首次运行开关:设置为True时会批量标记所有数据为"未删除"(仅首次运行需要)
RESET_ALL_DELETED_STATUS = False # 首次运行设为True,之后设为False
# 【3. 并发更新配置】
# 是否使用并发更新(多线程同时更新,速度快)
USE_CONCURRENT_UPDATE = False # True=并发更新(快),False=串行更新(慢)
# 并发线程数(同时执行的更新任务数)
# 建议值:5-20,过大可能被API限流,过小影响速度
# 如果API限流严重,可以降低到3-5
CONCURRENT_WORKERS = 8
# 【4. 批量创建配置】
# 是否使用批量创建(批量创建速度快)
USE_BATCH_CREATE = True # True=批量创建(快),False=逐条创建(慢)
# 批量创建大小(每批次创建的记录数)
BATCH_CREATE_SIZE = 100
# ====================================================
# 本地缓存文件路径
CACHE_DIR = os.path.join(output_dir, "cache")
os.makedirs(CACHE_DIR, exist_ok=True)
CACHE_FILES = {
'jdy_ngv_data': os.path.join(CACHE_DIR, 'jdy_ngv_data.csv'),
'staff_data': os.path.join(CACHE_DIR, 'staff_data.csv'),
'ngv_data_today': os.path.join(CACHE_DIR, 'ngv_data_today.csv'),
'ngv_data_yesterday': os.path.join(CACHE_DIR, 'ngv_data_yesterday.csv'),
}
class UpdateAllNGVDataDaily:
"""NGV数据每日更新"""
# 门店编码字段的Widget ID
ORG_CODE_WIDGET_ID = '_widget_1734062123071'
def __init__(self):
"""初始化"""
self.field_mapping = {}
self._init_field_mapping()
# org_code到所有_id的映射(用于更新所有重复记录)
self.org_code_to_all_ids = {}
def main(self):
"""主流程"""
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info("=" * 60)
logger.info(f"开始执行NGV数据更新任务:{task_start_time}")
logger.info("=" * 60)
# 步骤1: 加载基础数据
jdy_ngv_data, staff_id_map = self._load_base_data()
# 步骤2: 获取并处理NGV源数据
ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data()
# 步骤3: 处理已删除的门店
self._handle_deleted_stores(jdy_ngv_data, ngv_data_today)
# 步骤4: 对比数据变化
changed_data = self._compare_data_changes(ngv_data_today, ngv_data_yesterday, jdy_ngv_data)
# 步骤5: 数据转换
prepared_data = self._prepare_data_for_sync(changed_data, staff_id_map)
# 步骤6: 同步到简道云
self._sync_to_jiandaoyun(prepared_data)
logger.info("=" * 60)
logger.info("NGV更新数据任务已完成")
logger.info("=" * 60)
except Exception as e:
error_task_logger.error(f"NGV更新数据执行时发生异常: {e}", exc_info=True)
common_module.send_task_error(task_start_time, "NGV更新数据", str(e))
raise
def _load_base_data(self):
"""
步骤1: 加载基础数据
返回: (简道云NGV数据, 员工ID映射字典)
"""
logger.info("步骤1: 开始加载基础数据...")
# 检查是否使用本地缓存
if USE_LOCAL_CACHE:
logger.info(" [缓存模式] 从本地缓存读取数据...")
jdy_ngv_data = self._load_from_cache('jdy_ngv_data')
staff_df = self._load_from_cache('staff_data')
# 从缓存的staff_data重建映射字典
staff_id_map = {}
if not staff_df.empty and '_widget_1734942794144' in staff_df.columns and '_widget_1734942794145' in staff_df.columns:
staff_id_map = dict(zip(
staff_df['_widget_1734942794144'].astype(str),
staff_df['_widget_1734942794145']
))
logger.info(f" - [缓存] 简道云NGV数据: {len(jdy_ngv_data)}")
logger.info(f" - [缓存] 员工ID映射: {len(staff_id_map)} 个员工")
return jdy_ngv_data, staff_id_map
# 从API获取数据
logger.info(" [API模式] 从简道云API获取数据...")
# 获取简道云现有NGV数据
payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "675bb02bd2d53c2034c665e4"
}
ngv_data_list = api_instance.entry_data_list(payload).get("data", [])
jdy_ngv_data = pd.DataFrame(ngv_data_list)
logger.info(f" - 已获取简道云NGV数据: {len(jdy_ngv_data)}")
# 保存到本地缓存
self._save_to_cache(jdy_ngv_data, 'jdy_ngv_data')
# 获取员工信息并构建映射字典
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc"
}
staff_data = api_instance.entry_data_list(payload)
staff_list = staff_data.get("data", [])
staff_df = pd.DataFrame(staff_list)
# 保存到本地缓存
self._save_to_cache(staff_df, 'staff_data')
# 构建员工姓名到ID的映射字典(优化查询性能)
staff_id_map = {
str(staff['_widget_1734942794144']): staff['_widget_1734942794145']
for staff in staff_list
if '_widget_1734942794144' in staff and '_widget_1734942794145' in staff
}
logger.info(f" - 已构建员工ID映射字典: {len(staff_id_map)} 个员工")
return jdy_ngv_data, staff_id_map
def _load_ngv_source_data(self):
"""
步骤2: 获取并处理NGV源数据
返回: (昨天的数据, 前天的数据)
"""
logger.info("步骤2: 开始获取NGV源数据...")
# 检查是否使用本地缓存
if USE_LOCAL_CACHE:
logger.info(" [缓存模式] 从本地缓存读取NGV源数据...")
ngv_data_1 = self._load_from_cache('ngv_data_today')
ngv_data_2 = self._load_from_cache('ngv_data_yesterday')
logger.info(f" - [缓存] 昨天数据: {len(ngv_data_1)}")
logger.info(f" - [缓存] 前天数据: {len(ngv_data_2)}")
return ngv_data_1, ngv_data_2
# 从API获取数据
logger.info(" [API模式] 从数据库获取NGV源数据...")
# 获取最近两天的NGV数据
ngv_data_1 = common_module.get_ngv_details(days_back=1)
ngv_data_2 = common_module.get_ngv_details(days_back=2)
# 只保留 org_type 为 "一般" 的记录
ngv_data_1 = ngv_data_1[ngv_data_1['org_type'] == '一般']
ngv_data_2 = ngv_data_2[ngv_data_2['org_type'] == '一般']
# 保存到本地缓存
self._save_to_cache(ngv_data_1, 'ngv_data_today')
self._save_to_cache(ngv_data_2, 'ngv_data_yesterday')
logger.info(f" - 昨天数据: {len(ngv_data_1)}")
logger.info(f" - 前天数据: {len(ngv_data_2)}")
return ngv_data_1, ngv_data_2
def _handle_deleted_stores(self, jdy_ngv_data, ngv_current_data):
"""
步骤3: 处理已删除的门店
逻辑:
1. 【首次运行】批量标记所有数据为"未删除"(通过RESET_ALL_DELETED_STATUS开关控制)
2. 批量标记不存在的门店为"已删除"
重要:这个方法会修改简道云数据!
"""
logger.info("步骤3: 开始处理已删除的门店...")
# 准备简道云数据
temp_jdy_data = jdy_ngv_data.copy()
temp_jdy_data.reset_index(drop=True, inplace=True)
if self.ORG_CODE_WIDGET_ID not in temp_jdy_data.columns:
error_task_logger.error("'门店编码' 不存在")
return
# 添加org_code列
temp_jdy_data['org_code'] = temp_jdy_data[self.ORG_CODE_WIDGET_ID]
# 检查是否有空的org_code
null_org_codes = temp_jdy_data['org_code'].isna().sum()
if null_org_codes > 0:
logger.warning(f" ⚠ 警告: 简道云数据中有 {null_org_codes} 条记录的门店编码为空,将被忽略")
temp_jdy_data = temp_jdy_data[temp_jdy_data['org_code'].notna()]
# 检查并记录重复的org_code【数据质量问题】
duplicate_org_codes = temp_jdy_data[temp_jdy_data['org_code'].duplicated(keep=False)]
if len(duplicate_org_codes) > 0:
unique_duplicate_org_codes = duplicate_org_codes['org_code'].nunique()
logger.warning(
f" ⚠ 数据质量问题:发现 {unique_duplicate_org_codes} 个org_code重复,共 {len(duplicate_org_codes)} 条记录")
logger.warning(f" ⚠ 建议:请在简道云中清理重复数据,每个org_code只保留一条记录")
# 保存重复记录到文件供分析
if output_dir:
duplicate_org_codes[['org_code', '_id']].to_csv(
os.path.join(output_dir, '重复org_code记录.csv'),
index=False,
encoding='utf-8-sig'
)
logger.warning(f" - 重复详情已保存到: 重复org_code记录.csv")
# 【重要】标记删除时不去重,需要标记所有重复记录
logger.info(f" - 简道云原始数据共 {len(temp_jdy_data)} 条记录(包含重复)")
logger.info(f" - NGV源数据: {len(ngv_current_data)}")
# 检查_id列是否存在
if '_id' not in temp_jdy_data.columns:
error_task_logger.error(" ✗ 错误: 简道云数据中没有_id列,无法标记删除状态")
return
# 3.1 【首次运行】批量标记所有数据为"未删除"
if RESET_ALL_DELETED_STATUS:
logger.info(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
logger.info(" 【首次运行模式】批量重置所有数据删除状态")
logger.info(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
all_data_ids = temp_jdy_data['_id'].dropna().tolist()
if all_data_ids:
logger.info(f" - 准备批量标记所有数据为'未删除': {len(all_data_ids)}")
batch_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data_ids": all_data_ids,
"data": {"_widget_1754285499851": {"value": "未删除"}}
}
api_instance.entry_data_banch_update(data=batch_data, max_retries=20)
logger.info(f" ✓ 已批量标记所有数据为'未删除': {len(all_data_ids)}")
logger.info(f" 💡 提示:首次运行完成后,请将 RESET_ALL_DELETED_STATUS 设置为 False")
else:
logger.warning(" ⚠ 警告: 没有有效的_id可以标记")
# 3.2 找出需要标记为"已删除"的门店
ngv_org_codes = set(ngv_current_data['org_code'].dropna().unique())
jdy_org_codes_unique = set(temp_jdy_data['org_code'].dropna().unique())
# 找出在简道云存在但NGV中不存在的门店(唯一org_code)
missing_org_codes = jdy_org_codes_unique - ngv_org_codes
if len(missing_org_codes) == 0:
logger.info(" - 无需处理的已删除门店(简道云和NGV数据完全一致)")
return
logger.info(f" - 发现 {len(missing_org_codes)} 个门店在简道云存在但NGV中不存在")
# 【关键】筛选出所有匹配的记录(包括重复的)
# 这样可以把同一个org_code的所有记录都标记为已删除
only_in_jdy = temp_jdy_data[temp_jdy_data['org_code'].isin(missing_org_codes)].copy()
logger.info(f" - 需要标记删除的org_code数: {len(missing_org_codes)}")
logger.info(f" - 需要标记删除的记录数: {len(only_in_jdy)} 条(包含重复记录)")
# 3.3 批量标记为"已删除"
delete_data_ids = only_in_jdy['_id'].dropna().tolist()
if not delete_data_ids:
logger.warning(" ⚠ 警告: 没有有效的_id可以标记为已删除")
return
logger.info(f" - 准备批量标记为'已删除': {len(delete_data_ids)}")
batch_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
"data_ids": delete_data_ids,
"data": {"_widget_1754285499851": {"value": "已删除"}}
}
api_instance.entry_data_banch_update(data=batch_data, max_retries=20)
logger.info(f" ✓ 已批量标记为'已删除': {len(delete_data_ids)}")
def _compare_data_changes(self, ngv_today, ngv_yesterday, jdy_ngv_data):
"""
步骤4: 对比数据变化,找出需要更新的记录
返回: 带有_id的变化数据DataFrame
"""
logger.info("步骤4: 开始对比数据变化...")
# 移除不需要对比的列
columns_to_remove = {'date_id', 'date_fmt', 'pt', 'etl_time'}
# 过滤列
df1_filtered = ngv_today[[col for col in ngv_today.columns if col not in columns_to_remove]]
df2_filtered = ngv_yesterday[[col for col in ngv_yesterday.columns if col not in columns_to_remove]]
# 设置索引
df1_indexed = df1_filtered.set_index('org_code')
df2_indexed = df2_filtered.set_index('org_code')
# 数据清洗:统一处理空值
df1_indexed = df1_indexed.astype(str).replace(['nan', 'None'], '').fillna('')
df2_indexed = df2_indexed.astype(str).replace(['nan', 'None'], '').fillna('')
# 找到共同的索引和列
common_index = df1_indexed.index.intersection(df2_indexed.index)
df1_common = df1_indexed.reindex(common_index).fillna('')
df2_common = df2_indexed.reindex(common_index).fillna('')
common_columns = df1_common.columns.intersection(df2_common.columns)
df1_common = df1_common[common_columns]
df2_common = df2_common[common_columns]
# 对比数据
matches = (df1_common == df2_common).all(axis=1)
df1_common['match_status'] = matches.map({True: '一致', False: '不一致'})
# 统计一致和不一致的数量
consistent_count = (df1_common['match_status'] == '一致').sum()
inconsistent_count = (df1_common['match_status'] == '不一致').sum()
logger.info(f" 📊 共同门店数据对比结果:")
logger.info(f" - 共同门店总数: {len(df1_common)}")
logger.info(f" - 数据一致: {consistent_count}")
logger.info(f" - 数据不一致(需要更新): {inconsistent_count}")
# 只保留不一致的数据
changed_data = df1_common[df1_common['match_status'] == '不一致'].copy()
# 关联简道云的_id
temp_jdy = jdy_ngv_data.copy()
temp_jdy.reset_index(drop=True, inplace=True)
# 【新增】保存org_code到所有_id的映射(用于更新所有重复记录)
org_code_to_all_ids = {}
if self.ORG_CODE_WIDGET_ID in temp_jdy.columns:
temp_jdy.rename(columns={self.ORG_CODE_WIDGET_ID: 'org_code'}, inplace=True)
# 构建org_code到所有_id的映射
for _, row in temp_jdy.iterrows():
org_code = row['org_code']
data_id = row['_id']
if pd.notna(org_code) and pd.notna(data_id):
if org_code not in org_code_to_all_ids:
org_code_to_all_ids[org_code] = []
org_code_to_all_ids[org_code].append(data_id)
# 统计重复情况
multi_id_count = sum(1 for ids in org_code_to_all_ids.values() if len(ids) > 1)
if multi_id_count > 0:
logger.info(f" - 发现 {multi_id_count} 个org_code有多个_id,更新时将同步所有记录")
# 去重用于join(避免数据爆炸)
if '_createTime' in temp_jdy.columns:
temp_jdy = temp_jdy.sort_values('_createTime', ascending=False)
temp_jdy = temp_jdy.drop_duplicates(subset=['org_code'], keep='first')
temp_jdy.set_index('org_code', inplace=True)
changed_data = changed_data.join(temp_jdy['_id'], how='left')
# 保存映射关系,供同步时使用
self.org_code_to_all_ids = org_code_to_all_ids
# 【查缺补漏】查找NGV昨天有但简道云没有的门店,需要补充新建
logger.info(f"")
logger.info(f" 📊 查缺补漏分析(NGV有但简道云没有):")
# 获取NGV昨天的所有org_code(唯一值)
ngv_org_codes = set(df1_indexed.index.unique())
# 获取简道云的所有org_code(已去重的)
jdy_org_codes = set(temp_jdy.index.unique()) if len(temp_jdy) > 0 else set()
# 找出NGV有但简道云没有的门店(需要补充新建)
missing_in_jdy = ngv_org_codes - jdy_org_codes
logger.info(f" - NGV昨天的门店总数: {len(ngv_org_codes)}")
logger.info(f" - 简道云的门店总数: {len(jdy_org_codes)}")
logger.info(f" - NGV有但简道云没有(需补充): {len(missing_in_jdy)}")
if len(missing_in_jdy) > 0:
# 获取这些缺失门店的完整数据
missing_stores = df1_indexed.loc[list(missing_in_jdy)].copy()
missing_stores['match_status'] = '查缺补漏-需新建'
missing_stores['_id'] = None # 这些门店在简道云中没有_id
# 【关键】检查是否与已有的changed_data重复
if len(changed_data) > 0:
# 找出在changed_data中已存在的org_code
existing_org_codes = set(changed_data.index)
overlap_org_codes = missing_in_jdy & existing_org_codes
if len(overlap_org_codes) > 0:
logger.warning(f" ⚠ 警告:发现 {len(overlap_org_codes)} 个org_code同时在'不一致''查缺补漏'")
logger.warning(f" 这些记录可能是数据问题,已从查缺补漏中排除")
# 从missing_stores中移除重复的
missing_stores = missing_stores[~missing_stores.index.isin(overlap_org_codes)]
missing_in_jdy = missing_in_jdy - overlap_org_codes
if len(missing_stores) > 0:
logger.info(f" - 实际需要补充新建: {len(missing_stores)}")
# 将缺失门店加入到需要处理的数据中
changed_data = pd.concat([changed_data, missing_stores])
else:
logger.info(f" ✓ 排除重复后无需补充")
else:
logger.info(f" ✓ 无需补充,简道云数据完整")
logger.info(f"")
logger.info(f" 📊 汇总统计:")
logger.info(f" - 需要处理的数据总数: {len(changed_data)}")
logger.info(f" ├─ 数据不一致(需更新): {inconsistent_count}")
logger.info(f" └─ 查缺补漏(需新建): {len(missing_in_jdy)}")
return changed_data
def _prepare_data_for_sync(self, data_df, staff_id_map):
"""
步骤5: 数据转换(日期转UTC、人员字段转换)
"""
logger.info("步骤5: 开始数据转换...")
if len(data_df) == 0:
logger.info(" - 无需转换的数据")
return data_df
prepared_df = data_df.copy()
# 5.1 日期字段转换为UTC格式
time_columns = ['saas_create_time', 'expiry_time', 'install_create_time',
'last_end_date', 'renew_date']
for col in time_columns:
if col not in prepared_df.columns:
continue
# 转换为datetime类型
temp_series = pd.to_datetime(prepared_df[col], errors='coerce', utc=False)
# 转换为UTC时区格式
prepared_df[col + '_date'] = (
temp_series
.dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='NaT')
.dt.tz_convert('UTC')
.dt.strftime('%Y-%m-%dT%H:%M:%SZ')
)
logger.info(" - 日期字段已转换为UTC格式")
# 5.2 人员字段转换为员工ID(使用字典映射,优化性能)
staff_columns = ['area_manager', 'service_impl_principal',
'service_salesmen', 'technician']
for col in staff_columns:
if col not in prepared_df.columns:
continue
# 使用向量化操作替代三重循环
prepared_df[col + '_staff_id'] = prepared_df[col].astype(str).map(
lambda x: staff_id_map.get(x, None)
)
logger.info(" - 人员字段已转换为员工ID")
return prepared_df
def _sync_to_jiandaoyun(self, data_df):
"""
步骤6: 同步数据到简道云
支持批量更新和批量创建,大幅提升速度
"""
logger.info("步骤6: 开始同步数据到简道云...")
if len(data_df) == 0:
logger.info(" - 无需同步的数据")
return
# 分离更新和创建数据
update_data_list = []
create_data_list = []
logger.info(f" - 准备同步数据...")
for idx, row in data_df.iterrows():
# 构建数据字典
data_dict = self._build_data_dict(row)
# 判断是更新还是创建
has_id = '_id' in data_df.columns and pd.notna(row.get('_id'))
if has_id:
# 更新操作:移除门店编码字段(不允许修改)
if self.ORG_CODE_WIDGET_ID in data_dict:
del data_dict[self.ORG_CODE_WIDGET_ID]
# 【关键】获取该org_code的所有_id(包括重复记录)
org_code = idx
all_data_ids = self.org_code_to_all_ids.get(org_code, [str(row['_id'])])
# 为每个_id都准备更新数据
for data_id in all_data_ids:
update_data_list.append({
'org_code': org_code,
'data_id': str(data_id),
'data_dict': data_dict.copy()
})
else:
# 创建操作:必须包含门店编码字段
if self.ORG_CODE_WIDGET_ID not in data_dict:
org_code = idx
data_dict[self.ORG_CODE_WIDGET_ID] = {"value": org_code}
create_data_list.append({
'org_code': idx,
'data_dict': data_dict
})
logger.info(f" - 需要更新: {len(update_data_list)}")
logger.info(f" - 需要创建: {len(create_data_list)}")
# 执行更新
update_count = 0
if len(update_data_list) > 0:
if USE_CONCURRENT_UPDATE:
update_count = self._concurrent_update(update_data_list)
else:
update_count = self._single_update(update_data_list)
# 执行创建
create_count = 0
if len(create_data_list) > 0:
if USE_BATCH_CREATE:
create_count = self._batch_create(create_data_list)
else:
create_count = self._single_create(create_data_list)
logger.info(f" ✓ 同步完成: 更新 {update_count} 条, 创建 {create_count}")
def _concurrent_update(self, update_data_list):
"""
并发更新(多线程,速度快)
使用线程池并发执行更新操作,大幅提升速度。
预期速度提升:3-10倍(取决于CONCURRENT_WORKERS设置)
"""
logger.info(f" - 使用并发更新模式 (并发数: {CONCURRENT_WORKERS})")
total_count = len(update_data_list)
success_count = 0
failed_count = 0
start_time = time.time()
# 定义单条更新函数
def update_single_record(item):
try:
update_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
'data_id': item['data_id'],
'data': item['data_dict']
}
api_instance.entry_data_update(data=update_data, max_retries=20)
return {'success': True, 'org_code': item['org_code']}
except Exception as e:
error_task_logger.error(f"更新失败 (org_code={item['org_code']}): {e}")
return {'success': False, 'org_code': item['org_code'], 'error': str(e)}
# 使用线程池并发执行
with ThreadPoolExecutor(max_workers=CONCURRENT_WORKERS) as executor:
# 提交所有任务
future_to_item = {executor.submit(update_single_record, item): item for item in update_data_list}
# 使用tqdm显示进度
with tqdm(total=total_count, desc="并发更新") as pbar:
for future in as_completed(future_to_item):
result = future.result()
if result['success']:
success_count += 1
else:
failed_count += 1
pbar.update(1)
elapsed_time = time.time() - start_time
speed = total_count / elapsed_time if elapsed_time > 0 else 0
logger.info(f" - 更新完成: 成功 {success_count} 条, 失败 {failed_count}")
logger.info(f" - 耗时: {elapsed_time:.1f}秒, 速度: {speed:.1f}条/秒")
return success_count
def _single_update(self, update_data_list):
"""逐条更新(串行模式,速度慢但易定位问题)"""
logger.info(f" - 使用串行更新模式(逐条)")
success_count = 0
failed_count = 0
total_count = len(update_data_list)
start_time = time.time()
for item in tqdm(update_data_list, desc="串行更新"):
try:
update_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
'data_id': item['data_id'],
'data': item['data_dict']
}
api_instance.entry_data_update(data=update_data, max_retries=20)
success_count += 1
except Exception as e:
failed_count += 1
error_task_logger.error(f"更新失败 (org_code={item['org_code']}): {e}")
elapsed_time = time.time() - start_time
speed = total_count / elapsed_time if elapsed_time > 0 else 0
logger.info(f" - 更新完成: 成功 {success_count} 条, 失败 {failed_count}")
logger.info(f" - 耗时: {elapsed_time:.1f}秒, 速度: {speed:.1f}条/秒")
return success_count
def _batch_create(self, create_data_list):
"""批量创建(速度快)"""
logger.info(f" - 使用批量创建模式 (批次大小: {BATCH_CREATE_SIZE})")
total_count = len(create_data_list)
success_count = 0
# 分批处理
for i in range(0, total_count, BATCH_CREATE_SIZE):
batch = create_data_list[i:i + BATCH_CREATE_SIZE]
try:
# 批量创建
batch_create_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
'data': [item['data_dict'] for item in batch]
}
result = api_instance.data_batch_create(data=batch_create_data, max_retries=20)
success_count += len(batch)
if (i + BATCH_CREATE_SIZE) % (BATCH_CREATE_SIZE * 5) == 0 or (i + BATCH_CREATE_SIZE) >= total_count:
logger.info(f" - 已创建 {min(i + BATCH_CREATE_SIZE, total_count)}/{total_count}")
except Exception as e:
error_task_logger.error(f"批量创建失败 (批次 {i // BATCH_CREATE_SIZE + 1}): {e}")
# 批量失败时,尝试逐条创建这一批
logger.warning(f" - 批量创建失败,尝试逐条创建该批次...")
for item in batch:
try:
single_create_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
'data': item['data_dict']
}
api_instance.data_batch_create(data=single_create_data, max_retries=20)
success_count += 1
except Exception as e2:
error_task_logger.error(f"逐条创建失败 (org_code={item['org_code']}): {e2}")
return success_count
def _single_create(self, create_data_list):
"""逐条创建(速度慢但易定位问题)"""
logger.info(f" - 使用逐条创建模式")
success_count = 0
for item in tqdm(create_data_list, desc="逐条创建"):
try:
create_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
'data': item['data_dict']
}
api_instance.data_batch_create(data=create_data, max_retries=20)
success_count += 1
except Exception as e:
error_task_logger.error(f"创建失败 (org_code={item['org_code']}): {e}")
return success_count
def _build_data_dict(self, row):
"""
根据field_mapping构建简道云数据字典
"""
data_dict = {}
for col_name, widget_id in self.field_mapping.items():
if col_name in row.index:
value = row[col_name]
clean_value = None if pd.isna(value) else value
data_dict[widget_id] = {"value": clean_value}
return data_dict
def _init_field_mapping(self):
"""初始化字段映射关系"""
self.field_mapping = {
# 基础信息
'date_id': '_widget_1734062123065',
'date_fmt': '_widget_1734062123066',
'id_own_group': '_widget_1734062123067',
'group_name': '_widget_1734062123068',
'id_own_org': '_widget_1734062123069',
'org_name': '_widget_1734062123070',
'org_code': '_widget_1734062123071',
'group_grade': '_widget_1734062123072',
'org_type': '_widget_1734062123073',
'org_status': '_widget_1734062123074',
# SaaS相关
'saas_version': '_widget_1734062123075',
'is_wechat': '_widget_1734062123076',
'is_mini_app': '_widget_1734062123077',
'is_wx_shop': '_widget_1734062123078',
'is_camera_service': '_widget_1734062123079',
'is_maintenance_service': '_widget_1734062123080',
'saas_create_time': '_widget_1734062123081',
'expiry_time': '_widget_1734062123082',
'saas_use_days': '_widget_1734062123083',
'saas_use_year': '_widget_1734062123084',
'is_main_org': '_widget_1734062123085',
# 证照信息
'license_code': '_widget_1734062123086',
'license_name': '_widget_1734062123087',
'org_crm_id': '_widget_1734062123088',
# 地理信息
'province_id': '_widget_1734062123089',
'province_name': '_widget_1734062123090',
'city_id': '_widget_1734062123091',
'city_name': '_widget_1734062123092',
'area_id': '_widget_1734062123093',
'area_name': '_widget_1734062123094',
'region_name': '_widget_1734062123095',
'region_short_name': '_widget_1734062123096',
'branch_name': '_widget_1734062123097',
# 车主邦相关
'carzone_store_id': '_widget_1734062123098',
'carzone_store_name': '_widget_1734062123099',
'customer_carzone_id': '_widget_1734062123100',
# 人员信息
'salesmen': '_widget_1734062123101',
'area_manager': '_widget_1734062123102',
'service_salesmen': '_widget_1734062123103',
'impl_principal': '_widget_1734062123104',
'service_impl_principal': '_widget_1734062123105',
# 用户统计
'active_user_count': '_widget_1734062123106',
'active_user_type': '_widget_1734062123107',
'limit_user_count': '_widget_1734062123108',
'limit_user_type': '_widget_1734062123109',
# NGV标记
'is_n': '_widget_1734062123110',
'is_g': '_widget_1734062123111',
'is_v': '_widget_1734062123112',
'is_visited': '_widget_1734062123113',
'is_active': '_widget_1734062123114',
'active_status_fmt': '_widget_1734062123115',
# 单据统计
'bill_count_last_30_day': '_widget_1734062123116',
'bill_day_count_last_30_day': '_widget_1734062123117',
'bill_day_count_this_month': '_widget_1734062123118',
'bill_count_last_7_day': '_widget_1734062123119',
'bill_day_count_last_7_day': '_widget_1734062123120',
'pv_count': '_widget_1734062123121',
'uv_count': '_widget_1734062123122',
# 每日单据数(1-31天)
'bill_count_1d': '_widget_1734062123123',
'bill_count_2d': '_widget_1734062123124',
'bill_count_3d': '_widget_1734062123125',
'bill_count_4d': '_widget_1734062123126',
'bill_count_5d': '_widget_1734062123127',
'bill_count_6d': '_widget_1734062123128',
'bill_count_7d': '_widget_1734062123129',
'bill_count_8d': '_widget_1734062123130',
'bill_count_9d': '_widget_1734062123131',
'bill_count_10d': '_widget_1734062123132',
'bill_count_11d': '_widget_1734062123133',
'bill_count_12d': '_widget_1734062123134',
'bill_count_13d': '_widget_1734062123135',
'bill_count_14d': '_widget_1734062123136',
'bill_count_15d': '_widget_1734062123137',
'bill_count_16d': '_widget_1734062123138',
'bill_count_17d': '_widget_1734062123139',
'bill_count_18d': '_widget_1734062123140',
'bill_count_19d': '_widget_1734062123141',
'bill_count_20d': '_widget_1734062123142',
'bill_count_21d': '_widget_1734062123143',
'bill_count_22d': '_widget_1734062123144',
'bill_count_23d': '_widget_1734062123145',
'bill_count_24d': '_widget_1734062123146',
'bill_count_25d': '_widget_1734062123147',
'bill_count_26d': '_widget_1734062123148',
'bill_count_27d': '_widget_1734062123149',
'bill_count_28d': '_widget_1734062123150',
'bill_count_29d': '_widget_1734062123151',
'bill_count_30d': '_widget_1734062123152',
'bill_count_31d': '_widget_1734062123153',
# ETL时间
'etl_time': '_widget_1734062123154',
# 业务类型统计
'maintain_bill_count_last_30_day': '_widget_1734062123155',
'washing_bill_count_last_30_day': '_widget_1734062123156',
'maintain_bill_day_count_last_30_day': '_widget_1734062123157',
'washing_bill_day_count_last_30_day': '_widget_1734062123158',
'retail_bill_count_last_30_day': '_widget_1734062123159',
'retail_bill_day_count_last_30_day': '_widget_1734062123160',
'purchase_bill_count_last_30_day': '_widget_1734062123161',
'purchase_bill_day_count_last_30_day': '_widget_1734062123162',
'card_bill_count_last_30_day': '_widget_1734062123163',
'card_bill_day_count_last_30_day': '_widget_1734062123164',
'gd_sales_bill_count_last_30_day': '_widget_1734062123165',
'gd_sales_bill_day_count_last_30_day': '_widget_1734062123166',
# G标记相关
'g_change_flag': '_widget_1734062123167',
'saas_package': '_widget_1734062123168',
'manage_model': '_widget_1734062123169',
# 联系信息
'contacts': '_widget_1734062123170',
'contact_number': '_widget_1734062123171',
'contact_mobile': '_widget_1734062123172',
# G月度统计
'g_month_count': '_widget_1734062123173',
'g_month_percentage': '_widget_1734062123174',
# 安装服务
'is_install_service': '_widget_1734062123175',
'install_create_time': '_widget_1734062123176',
'last_end_date': '_widget_1734062123177',
'renew_date': '_widget_1734062123178',
# 连锁信息
'is_chain_owner': '_widget_1734062123179',
'group_org_count': '_widget_1734062123180',
# 预警信息
'recent_bill_warning_days': '_widget_1734062123181',
'g_change_flag_d': '_widget_1734062123182',
'g_lost_warning_days': '_widget_1734062123183',
# SaaS版本
'saas_edition_fmt': '_widget_1734062123184',
# G标记月度(1-6月)
'g_flag_1m': '_widget_1734062123185',
'g_flag_2m': '_widget_1734062123186',
'g_flag_3m': '_widget_1734062123187',
'g_flag_4m': '_widget_1734062123188',
'g_flag_5m': '_widget_1734062123189',
'g_flag_6m': '_widget_1734062123190',
'g_flag_day_count': '_widget_1734062123191',
# 其他标记
'add_org_flag': '_widget_1734062123192',
'pt': '_widget_1734062123193',
# 门店属性
'org_size': '_widget_1734062123194',
'qualification_type_fmt': '_widget_1734062123195',
'business_scope_fmt': '_widget_1734062123196',
'store_type_fmt': '_widget_1734062123197',
'area': '_widget_1734062123198',
'station_number': '_widget_1734062123199',
'header_type_fmt': '_widget_1734062123200',
'org_stage': '_widget_1734062123201',
# 本月统计
'g_count_this_month': '_widget_1734062123202',
'saas_customer_type': '_widget_1734062123203',
'technician': '_widget_1734062123204',
'tmall_maintain_service_status_desc': '_widget_1734062123205',
# 日期字段(UTC格式)
'date_fmt_date': '_widget_1749000071375',
'saas_create_time_date': '_widget_1749000071377',
'expiry_time_date': '_widget_1749000071382',
'install_create_time_date': '_widget_1749000071384',
'last_end_date_date': '_widget_1749000071389',
'renew_date_date': '_widget_1749000071391',
# 人员ID字段
'area_manager_staff_id': '_widget_1748496855779',
'service_salesmen_staff_id': '_widget_1748496855778',
'service_impl_principal_staff_id': '_widget_1748496855780',
'technician_staff_id': '_widget_1751877712235',
}
def _save_to_cache(self, df, cache_key):
"""
保存DataFrame到本地缓存
参数:
df: 要保存的DataFrame
cache_key: 缓存键名(对应CACHE_FILES中的键)
"""
if cache_key not in CACHE_FILES:
logger.warning(f" ⚠ 未知的缓存键: {cache_key}")
return
cache_file = CACHE_FILES[cache_key]
try:
df.to_csv(cache_file, index=False, encoding='utf-8-sig')
logger.info(f" ✓ 已保存到缓存: {cache_file} ({len(df)} 条记录)")
except Exception as e:
logger.warning(f" ⚠ 保存缓存失败 ({cache_key}): {e}")
def _load_from_cache(self, cache_key):
"""
从本地缓存加载DataFrame
参数:
cache_key: 缓存键名(对应CACHE_FILES中的键)
返回:
DataFrame或空DataFrame(如果缓存不存在)
"""
if cache_key not in CACHE_FILES:
logger.warning(f" ⚠ 未知的缓存键: {cache_key}")
return pd.DataFrame()
cache_file = CACHE_FILES[cache_key]
if not os.path.exists(cache_file):
logger.warning(f" ⚠ 缓存文件不存在: {cache_file}")
logger.warning(f" 请先关闭USE_LOCAL_CACHE运行一次以生成缓存文件")
return pd.DataFrame()
try:
df = pd.read_csv(cache_file, encoding='utf-8-sig')
logger.info(f" ✓ 已从缓存加载: {cache_file} ({len(df)} 条记录)")
return df
except Exception as e:
logger.error(f" ✗ 加载缓存失败 ({cache_key}): {e}")
return pd.DataFrame()
if __name__ == '__main__':
updater = UpdateAllNGVDataDaily()
updater.main()