8e57195033
更新续约代表数据一致性
1184 lines
50 KiB
Python
1184 lines
50 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import pandas as pd
|
||
import datetime
|
||
import os
|
||
import sys
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
import time
|
||
import numpy as np
|
||
|
||
# 添加父目录到Python路径,以便导入项目模块
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
from config import Config
|
||
from api import API
|
||
from back_ground_module import CommonModule
|
||
from log_config import configure_task_logger, configure_error_task_logger
|
||
from tqdm import tqdm
|
||
|
||
# 获取已经配置好的日志记录器
|
||
logger = configure_task_logger()
|
||
error_task_logger = configure_error_task_logger()
|
||
|
||
# 初始化实例
|
||
api_instance = API()
|
||
common_module = CommonModule()
|
||
|
||
# 输出目录配置
|
||
output_dir = "output"
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# ==================== 全局配置开关 ====================
|
||
|
||
# 【1. 本地缓存配置】
|
||
# 开发调试开关:设置为True时从本地缓存读取数据,False时从API获取
|
||
USE_LOCAL_CACHE = False
|
||
|
||
# 【2. 删除状态重置配置】
|
||
# 首次运行开关:设置为True时会批量标记所有数据为"未删除"(仅首次运行需要)
|
||
RESET_ALL_DELETED_STATUS = False # 首次运行设为True,之后设为False
|
||
|
||
# 【3. 并发更新配置】
|
||
# 是否使用并发更新(多线程同时更新,速度快)
|
||
USE_CONCURRENT_UPDATE = True # True=并发更新(快),False=串行更新(慢)
|
||
|
||
# 并发线程数(同时执行的更新任务数)
|
||
# 建议值:5-20,过大可能被API限流,过小影响速度
|
||
# 如果API限流严重,可以降低到3-5
|
||
CONCURRENT_WORKERS = 4
|
||
|
||
# 【4. 批量创建配置】
|
||
# 是否使用批量创建(批量创建速度快)
|
||
USE_BATCH_CREATE = True # True=批量创建(快),False=逐条创建(慢)
|
||
|
||
# 批量创建大小(每批次创建的记录数)
|
||
BATCH_CREATE_SIZE = 90
|
||
|
||
# ====================================================
|
||
|
||
# 本地缓存文件路径
|
||
CACHE_DIR = os.path.join(output_dir, "cache")
|
||
os.makedirs(CACHE_DIR, exist_ok=True)
|
||
|
||
CACHE_FILES = {
|
||
'jdy_ngv_data': os.path.join(CACHE_DIR, 'jdy_ngv_data.csv'),
|
||
'staff_data': os.path.join(CACHE_DIR, 'staff_data.csv'),
|
||
'ngv_data_today': os.path.join(CACHE_DIR, 'ngv_data_today.csv'),
|
||
'ngv_data_yesterday': os.path.join(CACHE_DIR, 'ngv_data_yesterday.csv'),
|
||
}
|
||
|
||
|
||
class UpdateAllNGVDataDaily:
|
||
"""NGV数据每日更新"""
|
||
|
||
# 门店编码字段的Widget ID
|
||
ORG_CODE_WIDGET_ID = '_widget_1734062123071'
|
||
|
||
def __init__(self):
|
||
"""初始化"""
|
||
self.field_mapping = {}
|
||
self._init_field_mapping()
|
||
# org_code到所有_id的映射(用于更新所有重复记录)
|
||
self.org_code_to_all_ids = {}
|
||
|
||
def main(self):
|
||
"""主流程"""
|
||
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
try:
|
||
logger.info("=" * 60)
|
||
logger.info(f"开始执行NGV数据更新任务:{task_start_time}")
|
||
logger.info("=" * 60)
|
||
|
||
# 步骤1: 加载基础数据
|
||
jdy_ngv_data, staff_id_map = self._load_base_data()
|
||
|
||
# 步骤2: 获取并处理NGV源数据
|
||
ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data(task_start_time)
|
||
|
||
# 步骤3: 处理已删除的门店
|
||
self._handle_deleted_stores(jdy_ngv_data, ngv_data_today)
|
||
|
||
# 步骤4: 对比数据变化
|
||
changed_data = self._compare_data_changes(ngv_data_today, ngv_data_yesterday, jdy_ngv_data)
|
||
|
||
# 步骤5: 数据转换
|
||
prepared_data = self._prepare_data_for_sync(changed_data, staff_id_map)
|
||
|
||
# 步骤6: 同步到简道云
|
||
self._sync_to_jiandaoyun(prepared_data)
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("NGV更新数据任务已完成")
|
||
common_module.send_task_status(task_start_time, "NGV更新数据")
|
||
logger.info("=" * 60)
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"NGV更新数据执行时发生异常: {e}", exc_info=True)
|
||
common_module.send_task_error(task_start_time, "NGV更新数据", str(e))
|
||
raise
|
||
|
||
def _compose_key_values(self, org_name, group_name, org_code, id_own_group, id_own_org):
|
||
"""将五个字段组合为稳定索引键,空值用空字符串,占位并去除首尾空格。"""
|
||
def nv(x):
|
||
return '' if pd.isna(x) else str(x).strip()
|
||
parts = [nv(org_name), nv(group_name), nv(org_code), nv(id_own_group), nv(id_own_org)]
|
||
return '||'.join(parts)
|
||
|
||
def _compose_key_df_ngv(self, df):
|
||
"""为NGV数据增加composite_key列(基于字段名)。"""
|
||
cols = ['org_name', 'group_name', 'org_code', 'id_own_group', 'id_own_org']
|
||
for c in cols:
|
||
if c not in df.columns:
|
||
df[c] = ''
|
||
df['composite_key'] = [
|
||
self._compose_key_values(r['org_name'], r['group_name'], r['org_code'], r['id_own_group'], r['id_own_org'])
|
||
for _, r in df.iterrows()
|
||
]
|
||
return df
|
||
|
||
def _compose_key_df_jdy(self, df):
|
||
"""为简道云数据增加composite_key列(基于widget列名)。"""
|
||
# 对应字段widget id
|
||
col_map = {
|
||
'_widget_1734062123070': 'org_name',
|
||
'_widget_1734062123068': 'group_name',
|
||
'_widget_1734062123071': 'org_code',
|
||
'_widget_1734062123067': 'id_own_group',
|
||
'_widget_1734062123069': 'id_own_org',
|
||
}
|
||
tmp = df.copy()
|
||
for wid in col_map.keys():
|
||
if wid not in tmp.columns:
|
||
tmp[wid] = ''
|
||
tmp_renamed = tmp.rename(columns=col_map)
|
||
tmp_renamed['composite_key'] = [
|
||
self._compose_key_values(r.get('org_name', ''), r.get('group_name', ''), r.get('org_code', ''), r.get('id_own_group', ''), r.get('id_own_org', ''))
|
||
for _, r in tmp_renamed.iterrows()
|
||
]
|
||
return tmp_renamed
|
||
|
||
def _load_base_data(self):
|
||
"""
|
||
步骤1: 加载基础数据
|
||
返回: (简道云NGV数据, 员工ID映射字典)
|
||
"""
|
||
logger.info("步骤1: 开始加载基础数据...")
|
||
|
||
# 检查是否使用本地缓存
|
||
if USE_LOCAL_CACHE:
|
||
logger.info(" [缓存模式] 从本地缓存读取数据...")
|
||
jdy_ngv_data = self._load_from_cache('jdy_ngv_data')
|
||
staff_df = self._load_from_cache('staff_data')
|
||
|
||
# 从缓存的staff_data重建映射字典
|
||
staff_id_map = {}
|
||
if not staff_df.empty and '_widget_1734942794144' in staff_df.columns and '_widget_1734942794145' in staff_df.columns:
|
||
staff_id_map = dict(zip(
|
||
staff_df['_widget_1734942794144'].astype(str),
|
||
staff_df['_widget_1734942794145']
|
||
))
|
||
|
||
logger.info(f" - [缓存] 简道云NGV数据: {len(jdy_ngv_data)} 条")
|
||
logger.info(f" - [缓存] 员工ID映射: {len(staff_id_map)} 个员工")
|
||
return jdy_ngv_data, staff_id_map
|
||
|
||
# 从API获取数据
|
||
logger.info(" [API模式] 从简道云API获取数据...")
|
||
|
||
# 获取简道云现有NGV数据
|
||
payload = {
|
||
"api_key": "675b900991ad2491c69389ca",
|
||
"entry_id": "675bb02bd2d53c2034c665e4"
|
||
}
|
||
ngv_data_list = api_instance.entry_data_list(payload).get("data", [])
|
||
jdy_ngv_data = pd.DataFrame(ngv_data_list)
|
||
logger.info(f" - 已获取简道云NGV数据: {len(jdy_ngv_data)} 条")
|
||
|
||
# 保存到本地缓存
|
||
self._save_to_cache(jdy_ngv_data, 'jdy_ngv_data')
|
||
|
||
# 获取员工信息并构建映射字典
|
||
payload = {
|
||
"api_key": "6694d3c4fcb69ca9a111a6c4",
|
||
"entry_id": "6769204a1902c9341340a1bc"
|
||
}
|
||
staff_data = api_instance.entry_data_list(payload)
|
||
staff_list = staff_data.get("data", [])
|
||
staff_df = pd.DataFrame(staff_list)
|
||
|
||
# 保存到本地缓存
|
||
self._save_to_cache(staff_df, 'staff_data')
|
||
|
||
# 构建员工姓名到ID的映射字典(优化查询性能)
|
||
staff_id_map = {
|
||
str(staff['_widget_1734942794144']): staff['_widget_1734942794145']
|
||
for staff in staff_list
|
||
if '_widget_1734942794144' in staff and '_widget_1734942794145' in staff
|
||
}
|
||
logger.info(f" - 已构建员工ID映射字典: {len(staff_id_map)} 个员工")
|
||
|
||
return jdy_ngv_data, staff_id_map
|
||
|
||
def _load_ngv_source_data(self, task_start_time):
|
||
"""
|
||
步骤2: 获取并处理NGV源数据
|
||
返回: (昨天的数据, 前天的数据)
|
||
"""
|
||
logger.info("步骤2: 开始获取NGV源数据...")
|
||
|
||
# 检查是否使用本地缓存
|
||
if USE_LOCAL_CACHE:
|
||
logger.info(" [缓存模式] 从本地缓存读取NGV源数据...")
|
||
ngv_data_1 = self._load_from_cache('ngv_data_today')
|
||
ngv_data_2 = self._load_from_cache('ngv_data_yesterday')
|
||
|
||
logger.info(f" - [缓存] 昨天数据: {len(ngv_data_1)} 条")
|
||
logger.info(f" - [缓存] 前天数据: {len(ngv_data_2)} 条")
|
||
return ngv_data_1, ngv_data_2
|
||
|
||
# 从API获取数据
|
||
logger.info(" [API模式] 从数据库获取NGV源数据...")
|
||
|
||
# 获取最近两天的NGV数据
|
||
ngv_data_1 = common_module.get_ngv_details(days_back=1)
|
||
ngv_data_2 = common_module.get_ngv_details(days_back=2)
|
||
|
||
import time
|
||
nowtime = time.time()
|
||
|
||
# 存储每天获取到的数据
|
||
ngv_data_1.to_csv(f"ngv_data_today.csv", index=False)
|
||
ngv_data_2.to_csv(f"ngv_data_yesterday.csv", index=False)
|
||
|
||
# 只保留 org_type 为 "一般" 的记录
|
||
ngv_data_1 = ngv_data_1[ngv_data_1['org_type'] == '一般']
|
||
ngv_data_2 = ngv_data_2[ngv_data_2['org_type'] == '一般']
|
||
|
||
# 保存到本地缓存
|
||
self._save_to_cache(ngv_data_1, 'ngv_data_today')
|
||
self._save_to_cache(ngv_data_2, 'ngv_data_yesterday')
|
||
|
||
logger.info(f" - 昨天数据: {len(ngv_data_1)} 条")
|
||
logger.info(f" - 前天数据: {len(ngv_data_2)} 条")
|
||
|
||
return ngv_data_1, ngv_data_2
|
||
|
||
def _handle_deleted_stores(self, jdy_ngv_data, ngv_current_data):
|
||
"""
|
||
步骤3: 处理已删除的门店
|
||
逻辑:
|
||
1. 【首次运行】批量标记所有数据为"未删除"(通过RESET_ALL_DELETED_STATUS开关控制)
|
||
2. 批量标记不存在的门店为"已删除"
|
||
|
||
重要:这个方法会修改简道云数据!
|
||
"""
|
||
logger.info("步骤3: 开始处理已删除的门店...")
|
||
|
||
# 准备简道云数据
|
||
temp_jdy_data = jdy_ngv_data.copy()
|
||
temp_jdy_data.reset_index(drop=True, inplace=True)
|
||
|
||
if self.ORG_CODE_WIDGET_ID not in temp_jdy_data.columns:
|
||
error_task_logger.error("列 '门店编码' 不存在")
|
||
return
|
||
|
||
# 添加org_code列
|
||
temp_jdy_data['org_code'] = temp_jdy_data[self.ORG_CODE_WIDGET_ID]
|
||
|
||
# 检查是否有空的org_code
|
||
null_org_codes = temp_jdy_data['org_code'].isna().sum()
|
||
if null_org_codes > 0:
|
||
logger.warning(f" ⚠ 警告: 简道云数据中有 {null_org_codes} 条记录的门店编码为空,将被忽略")
|
||
temp_jdy_data = temp_jdy_data[temp_jdy_data['org_code'].notna()]
|
||
|
||
# 检查并记录重复的org_code【数据质量问题】
|
||
duplicate_org_codes = temp_jdy_data[temp_jdy_data['org_code'].duplicated(keep=False)]
|
||
if len(duplicate_org_codes) > 0:
|
||
unique_duplicate_org_codes = duplicate_org_codes['org_code'].nunique()
|
||
logger.warning(
|
||
f" ⚠ 数据质量问题:发现 {unique_duplicate_org_codes} 个org_code重复,共 {len(duplicate_org_codes)} 条记录")
|
||
logger.warning(f" ⚠ 建议:请在简道云中清理重复数据,每个org_code只保留一条记录")
|
||
|
||
# 保存重复记录到文件供分析
|
||
if output_dir:
|
||
duplicate_org_codes[['org_code', '_id']].to_csv(
|
||
os.path.join(output_dir, '重复org_code记录.csv'),
|
||
index=False,
|
||
encoding='utf-8-sig'
|
||
)
|
||
logger.warning(f" - 重复详情已保存到: 重复org_code记录.csv")
|
||
|
||
# 【重要】标记删除时不去重,需要标记所有重复记录
|
||
logger.info(f" - 简道云原始数据共 {len(temp_jdy_data)} 条记录(包含重复)")
|
||
logger.info(f" - NGV源数据: {len(ngv_current_data)} 条")
|
||
|
||
# 检查_id列是否存在
|
||
if '_id' not in temp_jdy_data.columns:
|
||
error_task_logger.error(" ✗ 错误: 简道云数据中没有_id列,无法标记删除状态")
|
||
return
|
||
|
||
# 3.1 【首次运行】批量标记所有数据为"未删除"
|
||
if RESET_ALL_DELETED_STATUS:
|
||
logger.info(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
|
||
logger.info(" 【首次运行模式】批量重置所有数据删除状态")
|
||
logger.info(" ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
|
||
|
||
all_data_ids = temp_jdy_data['_id'].dropna().tolist()
|
||
if all_data_ids:
|
||
logger.info(f" - 准备批量标记所有数据为'未删除': {len(all_data_ids)} 条")
|
||
|
||
batch_data = {
|
||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||
"data_ids": all_data_ids,
|
||
"data": {"_widget_1754285499851": {"value": "未删除"}}
|
||
}
|
||
api_instance.entry_data_banch_update(data=batch_data, max_retries=20)
|
||
logger.info(f" ✓ 已批量标记所有数据为'未删除': {len(all_data_ids)} 条")
|
||
logger.info(f" 💡 提示:首次运行完成后,请将 RESET_ALL_DELETED_STATUS 设置为 False")
|
||
else:
|
||
logger.warning(" ⚠ 警告: 没有有效的_id可以标记")
|
||
|
||
# 3.2 找出需要标记为"已删除"的门店
|
||
ngv_org_codes = set(ngv_current_data['org_code'].dropna().unique())
|
||
jdy_org_codes_unique = set(temp_jdy_data['org_code'].dropna().unique())
|
||
|
||
# 找出在简道云存在但NGV中不存在的门店(唯一复合索引)
|
||
missing_org_codes = jdy_org_codes_unique - ngv_org_codes
|
||
|
||
if len(missing_org_codes) == 0:
|
||
logger.info(" - 无需处理的已删除门店(简道云和NGV数据完全一致)")
|
||
return
|
||
|
||
logger.info(f" - 发现 {len(missing_org_codes)} 个门店在简道云存在但NGV中不存在")
|
||
|
||
# 【关键】筛选出所有匹配的记录(包括重复的)
|
||
# 这样可以把同一个org_code的所有记录都标记为已删除
|
||
only_in_jdy = temp_jdy_data[temp_jdy_data['org_code'].isin(missing_org_codes)].copy()
|
||
|
||
logger.info(f" - 需要标记删除的org_code数: {len(missing_org_codes)}")
|
||
logger.info(f" - 需要标记删除的记录数: {len(only_in_jdy)} 条(包含重复记录)")
|
||
|
||
# 3.3 批量标记为"已删除"
|
||
delete_data_ids = only_in_jdy['_id'].dropna().tolist()
|
||
if not delete_data_ids:
|
||
logger.warning(" ⚠ 警告: 没有有效的_id可以标记为已删除")
|
||
return
|
||
|
||
logger.info(f" - 准备批量标记为'已删除': {len(delete_data_ids)} 条")
|
||
|
||
batch_data = {
|
||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||
"data_ids": delete_data_ids,
|
||
"data": {"_widget_1754285499851": {"value": "已删除"}}
|
||
}
|
||
api_instance.entry_data_banch_update(data=batch_data, max_retries=20)
|
||
logger.info(f" ✓ 已批量标记为'已删除': {len(delete_data_ids)} 条")
|
||
|
||
def _compare_data_changes(self, ngv_today, ngv_yesterday, jdy_ngv_data):
|
||
"""
|
||
步骤4: 对比数据变化,找出需要更新的记录
|
||
返回: 带有_id的变化数据DataFrame
|
||
"""
|
||
logger.info("步骤4: 开始对比数据变化...")
|
||
|
||
# 移除不需要对比的列
|
||
columns_to_remove = {'date_id', 'date_fmt', 'pt', 'etl_time','id_own_org'}
|
||
|
||
# 过滤列
|
||
df1_filtered = ngv_today[[col for col in ngv_today.columns if col not in columns_to_remove]]
|
||
df2_filtered = ngv_yesterday[[col for col in ngv_yesterday.columns if col not in columns_to_remove]]
|
||
|
||
# 设置索引
|
||
df1_indexed = df1_filtered.set_index('org_code')
|
||
df2_indexed = df2_filtered.set_index('org_code')
|
||
|
||
# 数据清洗:统一处理空值
|
||
df1_indexed = df1_indexed.astype(str).replace(['nan', 'None'], '').fillna('')
|
||
df2_indexed = df2_indexed.astype(str).replace(['nan', 'None'], '').fillna('')
|
||
|
||
# 找到共同的索引和列
|
||
common_index = df1_indexed.index.intersection(df2_indexed.index)
|
||
df1_common = df1_indexed.reindex(common_index).fillna('')
|
||
df2_common = df2_indexed.reindex(common_index).fillna('')
|
||
|
||
common_columns = df1_common.columns.intersection(df2_common.columns)
|
||
df1_common = df1_common[common_columns]
|
||
df2_common = df2_common[common_columns]
|
||
|
||
# 对比数据
|
||
matches = (df1_common == df2_common).all(axis=1)
|
||
df1_common['match_status'] = matches.map({True: '一致', False: '不一致'})
|
||
|
||
# 统计一致和不一致的数量
|
||
consistent_count = (df1_common['match_status'] == '一致').sum()
|
||
inconsistent_count = (df1_common['match_status'] == '不一致').sum()
|
||
|
||
logger.info(f" 📊 共同门店数据对比结果:")
|
||
logger.info(f" - 共同门店总数: {len(df1_common)}")
|
||
logger.info(f" - 数据一致: {consistent_count} 条")
|
||
logger.info(f" - 数据不一致(需要更新): {inconsistent_count} 条")
|
||
|
||
# 只保留不一致的数据
|
||
changed_data = df1_common[df1_common['match_status'] == '不一致'].copy()
|
||
|
||
# 关联简道云的_id(基于org_code)
|
||
temp_jdy = jdy_ngv_data.copy()
|
||
temp_jdy.reset_index(drop=True, inplace=True)
|
||
|
||
# 【新增】保存org_code到所有_id的映射(用于更新所有重复记录)
|
||
org_code_to_all_ids = {}
|
||
|
||
if self.ORG_CODE_WIDGET_ID in temp_jdy.columns:
|
||
temp_jdy.rename(columns={self.ORG_CODE_WIDGET_ID: 'org_code'}, inplace=True)
|
||
|
||
# 构建org_code到所有_id的映射
|
||
for _, row in temp_jdy.iterrows():
|
||
org_code = row['org_code']
|
||
data_id = row['_id']
|
||
if pd.notna(org_code) and pd.notna(data_id):
|
||
if org_code not in org_code_to_all_ids:
|
||
org_code_to_all_ids[org_code] = []
|
||
org_code_to_all_ids[org_code].append(data_id)
|
||
|
||
# 统计重复情况
|
||
multi_id_count = sum(1 for ids in org_code_to_all_ids.values() if len(ids) > 1)
|
||
if multi_id_count > 0:
|
||
logger.info(f" - 发现 {multi_id_count} 个org_code有多个_id,更新时将同步所有记录")
|
||
|
||
# 去重用于join(避免数据爆炸)
|
||
if '_createTime' in temp_jdy.columns:
|
||
temp_jdy = temp_jdy.sort_values('_createTime', ascending=False)
|
||
temp_jdy = temp_jdy.drop_duplicates(subset=['org_code'], keep='first')
|
||
temp_jdy.set_index('org_code', inplace=True)
|
||
changed_data = changed_data.join(temp_jdy['_id'], how='left')
|
||
|
||
# 保存映射关系,供同步时使用
|
||
self.org_code_to_all_ids = org_code_to_all_ids
|
||
|
||
# 【查缺补漏】查找NGV昨天有但简道云没有的门店,需要补充新建
|
||
logger.info(f"")
|
||
logger.info(f" 📊 查缺补漏分析(NGV有但简道云没有):")
|
||
|
||
# 获取NGV昨天的所有org_code(唯一值)
|
||
ngv_org_codes = set(df1_indexed.index.unique())
|
||
# 获取简道云的所有org_code(已去重的)
|
||
jdy_org_codes = set(temp_jdy.index.unique()) if len(temp_jdy) > 0 else set()
|
||
|
||
# 找出NGV有但简道云没有的门店(需要补充新建)
|
||
missing_in_jdy = ngv_org_codes - jdy_org_codes
|
||
|
||
logger.info(f" - NGV昨天的门店总数: {len(ngv_org_codes)}")
|
||
logger.info(f" - 简道云的门店总数: {len(jdy_org_codes)}")
|
||
logger.info(f" - NGV有但简道云没有(需补充): {len(missing_in_jdy)} 条")
|
||
|
||
if len(missing_in_jdy) > 0:
|
||
# 获取这些缺失门店的完整数据
|
||
missing_stores = df1_indexed.loc[list(missing_in_jdy)].copy()
|
||
missing_stores['match_status'] = '查缺补漏-需新建'
|
||
missing_stores['_id'] = None # 这些门店在简道云中没有_id
|
||
|
||
# 【关键】检查是否与已有的changed_data重复
|
||
if len(changed_data) > 0:
|
||
# 找出在changed_data中已存在的org_code
|
||
existing_org_codes = set(changed_data.index)
|
||
overlap_org_codes = missing_in_jdy & existing_org_codes
|
||
if len(overlap_org_codes) > 0:
|
||
logger.warning(f" ⚠ 警告:发现 {len(overlap_org_codes)} 个org_code同时在'不一致'和'查缺补漏'中")
|
||
logger.warning(f" 这些记录可能是数据问题,已从查缺补漏中排除")
|
||
# 从missing_stores中移除重复的
|
||
missing_stores = missing_stores[~missing_stores.index.isin(overlap_org_codes)]
|
||
missing_in_jdy = missing_in_jdy - overlap_org_codes
|
||
|
||
if len(missing_stores) > 0:
|
||
logger.info(f" - 实际需要补充新建: {len(missing_stores)} 条")
|
||
# 将缺失门店加入到需要处理的数据中
|
||
changed_data = pd.concat([changed_data, missing_stores])
|
||
else:
|
||
logger.info(f" ✓ 排除重复后无需补充")
|
||
else:
|
||
logger.info(f" ✓ 无需补充,简道云数据完整")
|
||
|
||
logger.info(f"")
|
||
logger.info(f" 📊 汇总统计:")
|
||
logger.info(f" - 需要处理的数据总数: {len(changed_data)} 条")
|
||
logger.info(f" ├─ 数据不一致(需更新): {inconsistent_count} 条")
|
||
logger.info(f" └─ 查缺补漏(需新建): {len(missing_in_jdy)} 条")
|
||
|
||
return changed_data
|
||
|
||
def _prepare_data_for_sync(self, data_df, staff_id_map):
|
||
"""
|
||
步骤5: 数据转换(日期转UTC、人员字段转换)
|
||
"""
|
||
logger.info("步骤5: 开始数据转换...")
|
||
|
||
if len(data_df) == 0:
|
||
logger.info(" - 无需转换的数据")
|
||
return data_df
|
||
|
||
prepared_df = data_df.copy()
|
||
|
||
# 5.1 日期字段转换为UTC格式
|
||
time_columns = ['saas_create_time', 'expiry_time', 'install_create_time',
|
||
'last_end_date', 'renew_date']
|
||
|
||
for col in time_columns:
|
||
if col not in prepared_df.columns:
|
||
continue
|
||
|
||
# 转换为datetime类型
|
||
temp_series = pd.to_datetime(prepared_df[col], errors='coerce', utc=False)
|
||
|
||
# 转换为UTC时区格式
|
||
prepared_df[col + '_date'] = (
|
||
temp_series
|
||
.dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='NaT')
|
||
.dt.tz_convert('UTC')
|
||
.dt.strftime('%Y-%m-%dT%H:%M:%SZ')
|
||
)
|
||
|
||
logger.info(" - 日期字段已转换为UTC格式")
|
||
|
||
# 5.2 人员字段转换为员工ID(使用字典映射,优化性能)
|
||
staff_columns = ['area_manager', 'service_impl_principal',
|
||
'service_salesmen', 'technician']
|
||
|
||
for col in staff_columns:
|
||
if col not in prepared_df.columns:
|
||
continue
|
||
|
||
# 使用向量化操作替代三重循环
|
||
prepared_df[col + '_staff_id'] = prepared_df[col].astype(str).map(
|
||
lambda x: staff_id_map.get(x, None)
|
||
)
|
||
|
||
logger.info(" - 人员字段已转换为员工ID")
|
||
|
||
# 5.3G转化率保留3位小数
|
||
prepared_df['g_month_percentage'] = (pd.to_numeric(prepared_df['g_month_percentage'], errors='coerce')
|
||
.round(3)
|
||
.apply(lambda x: f"{x:.3f}" if pd.notna(x) else ''))
|
||
logger.info(" - G转化率已保留3位小数")
|
||
|
||
return prepared_df
|
||
|
||
def _sync_to_jiandaoyun(self, data_df):
|
||
"""
|
||
步骤6: 同步数据到简道云
|
||
支持批量更新和批量创建,大幅提升速度
|
||
"""
|
||
logger.info("步骤6: 开始同步数据到简道云...")
|
||
|
||
if len(data_df) == 0:
|
||
logger.info(" - 无需同步的数据")
|
||
return
|
||
|
||
# 分离更新和创建数据
|
||
update_data_list = []
|
||
create_data_list = []
|
||
|
||
logger.info(f" - 准备同步数据...")
|
||
for idx, row in data_df.iterrows():
|
||
# 构建数据字典
|
||
data_dict = self._build_data_dict(row)
|
||
|
||
# 判断是更新还是创建
|
||
has_id = '_id' in data_df.columns and pd.notna(row.get('_id'))
|
||
|
||
if has_id:
|
||
# 更新操作:移除门店编码字段(不允许修改)
|
||
if self.ORG_CODE_WIDGET_ID in data_dict:
|
||
del data_dict[self.ORG_CODE_WIDGET_ID]
|
||
|
||
# 【关键】获取该org_code的所有_id(包括重复记录)
|
||
org_code = idx
|
||
all_data_ids = self.org_code_to_all_ids.get(org_code, [str(row['_id'])])
|
||
|
||
# 为每个_id都准备更新数据
|
||
for data_id in all_data_ids:
|
||
update_data_list.append({
|
||
'org_code': org_code,
|
||
'data_id': str(data_id),
|
||
'data_dict': data_dict.copy(),
|
||
'row_data': row # 保存原始数据用于输出
|
||
})
|
||
else:
|
||
|
||
# 新增:仅创建内容非全空的记录
|
||
# 检查除了门店编码外,其他字段是否全为空
|
||
all_empty = True
|
||
for k, v in data_dict.items():
|
||
if k != self.ORG_CODE_WIDGET_ID and v.get('value') not in (None, '', float('nan')):
|
||
all_empty = False
|
||
break
|
||
if all_empty:
|
||
logger.info(f" - 跳过内容全空的新建记录 org_code: {idx}")
|
||
continue
|
||
|
||
create_data_list.append({
|
||
'org_code': idx,
|
||
'data_dict': data_dict,
|
||
'row_data': row # 保存原始数据用于输出
|
||
})
|
||
|
||
logger.info(f" - 需要更新: {len(update_data_list)} 条")
|
||
logger.info(f" - 需要创建: {len(create_data_list)} 条")
|
||
|
||
# 执行更新
|
||
update_count = 0
|
||
if len(update_data_list) > 0:
|
||
if USE_CONCURRENT_UPDATE:
|
||
update_count = self._concurrent_update(update_data_list)
|
||
else:
|
||
update_count = self._single_update(update_data_list)
|
||
# 输出更新统计
|
||
self._save_update_stats(update_data_list)
|
||
|
||
# 执行创建
|
||
create_count = 0
|
||
|
||
create_df = pd.DataFrame(create_data_list)
|
||
create_df.to_csv(f"create_data.csv", index=False)
|
||
# if len(create_data_list) > 0:
|
||
# if USE_BATCH_CREATE:
|
||
# create_count = self._batch_create(create_data_list)
|
||
# else:
|
||
# create_count = self._single_create(create_data_list)
|
||
# # 输出新增数据
|
||
# self._save_create_data(create_data_list)
|
||
|
||
logger.info(f" ✓ 同步完成: 更新 {update_count} 条, 创建 {create_count} 条")
|
||
|
||
def _concurrent_update(self, update_data_list):
|
||
"""
|
||
并发更新(多线程,速度快)
|
||
|
||
使用线程池并发执行更新操作,大幅提升速度。
|
||
预期速度提升:3-10倍(取决于CONCURRENT_WORKERS设置)
|
||
"""
|
||
logger.info(f" - 使用并发更新模式 (并发数: {CONCURRENT_WORKERS})")
|
||
|
||
total_count = len(update_data_list)
|
||
success_count = 0
|
||
failed_count = 0
|
||
start_time = time.time()
|
||
|
||
# 定义单条更新函数
|
||
def update_single_record(item):
|
||
try:
|
||
update_data = {
|
||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||
'data_id': item['data_id'],
|
||
'data': item['data_dict']
|
||
}
|
||
api_instance.entry_data_update(data=update_data, max_retries=20)
|
||
return {'success': True, 'org_code': item['org_code']}
|
||
except Exception as e:
|
||
error_task_logger.error(f"更新失败 (org_code={item['org_code']}): {e}")
|
||
return {'success': False, 'org_code': item['org_code'], 'error': str(e)}
|
||
|
||
# 使用线程池并发执行
|
||
with ThreadPoolExecutor(max_workers=CONCURRENT_WORKERS) as executor:
|
||
# 提交所有任务
|
||
future_to_item = {executor.submit(update_single_record, item): item for item in update_data_list}
|
||
|
||
# 使用tqdm显示进度
|
||
with tqdm(total=total_count, desc="并发更新") as pbar:
|
||
for future in as_completed(future_to_item):
|
||
result = future.result()
|
||
if result['success']:
|
||
success_count += 1
|
||
else:
|
||
failed_count += 1
|
||
pbar.update(1)
|
||
|
||
elapsed_time = time.time() - start_time
|
||
speed = total_count / elapsed_time if elapsed_time > 0 else 0
|
||
|
||
logger.info(f" - 更新完成: 成功 {success_count} 条, 失败 {failed_count} 条")
|
||
logger.info(f" - 耗时: {elapsed_time:.1f}秒, 速度: {speed:.1f}条/秒")
|
||
|
||
return success_count
|
||
|
||
def _single_update(self, update_data_list):
|
||
"""逐条更新(串行模式,速度慢但易定位问题)"""
|
||
logger.info(f" - 使用串行更新模式(逐条)")
|
||
|
||
success_count = 0
|
||
failed_count = 0
|
||
total_count = len(update_data_list)
|
||
start_time = time.time()
|
||
|
||
for item in tqdm(update_data_list, desc="串行更新"):
|
||
try:
|
||
update_data = {
|
||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||
'data_id': item['data_id'],
|
||
'data': item['data_dict']
|
||
}
|
||
api_instance.entry_data_update(data=update_data, max_retries=20)
|
||
success_count += 1
|
||
except Exception as e:
|
||
failed_count += 1
|
||
error_task_logger.error(f"更新失败 (org_code={item['org_code']}): {e}")
|
||
|
||
elapsed_time = time.time() - start_time
|
||
speed = total_count / elapsed_time if elapsed_time > 0 else 0
|
||
|
||
logger.info(f" - 更新完成: 成功 {success_count} 条, 失败 {failed_count} 条")
|
||
logger.info(f" - 耗时: {elapsed_time:.1f}秒, 速度: {speed:.1f}条/秒")
|
||
|
||
return success_count
|
||
|
||
def _batch_create(self, create_data_list):
|
||
"""批量创建(速度快)"""
|
||
logger.info(f" - 使用批量创建模式 (批次大小: {BATCH_CREATE_SIZE})")
|
||
|
||
total_count = len(create_data_list)
|
||
success_count = 0
|
||
|
||
# 分批处理
|
||
for i in range(0, total_count, BATCH_CREATE_SIZE):
|
||
batch = create_data_list[i:i + BATCH_CREATE_SIZE]
|
||
|
||
try:
|
||
# 批量创建
|
||
batch_create_data = {
|
||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||
'data': [item['data_dict'] for item in batch]
|
||
}
|
||
result = api_instance.data_batch_create(data=batch_create_data, max_retries=20)
|
||
success_count += len(batch)
|
||
|
||
if (i + BATCH_CREATE_SIZE) % (BATCH_CREATE_SIZE * 5) == 0 or (i + BATCH_CREATE_SIZE) >= total_count:
|
||
logger.info(f" - 已创建 {min(i + BATCH_CREATE_SIZE, total_count)}/{total_count} 条")
|
||
except Exception as e:
|
||
error_task_logger.error(f"批量创建失败 (批次 {i // BATCH_CREATE_SIZE + 1}): {e}")
|
||
# 批量失败时,尝试逐条创建这一批
|
||
logger.warning(f" - 批量创建失败,尝试逐条创建该批次...")
|
||
for item in batch:
|
||
try:
|
||
single_create_data = {
|
||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||
'data': item['data_dict']
|
||
}
|
||
api_instance.data_batch_create(data=single_create_data, max_retries=20)
|
||
success_count += 1
|
||
except Exception as e2:
|
||
error_task_logger.error(f"逐条创建失败 (org_code={item['org_code']}): {e2}")
|
||
|
||
return success_count
|
||
|
||
def _single_create(self, create_data_list):
|
||
"""逐条创建(速度慢但易定位问题)"""
|
||
logger.info(f" - 使用逐条创建模式")
|
||
|
||
success_count = 0
|
||
|
||
for item in tqdm(create_data_list, desc="逐条创建"):
|
||
try:
|
||
create_data = {
|
||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||
'data': item['data_dict'],
|
||
'is_start_trigger': 'true',
|
||
|
||
}
|
||
api_instance.data_batch_create(data=create_data, max_retries=20)
|
||
success_count += 1
|
||
except Exception as e:
|
||
error_task_logger.error(f"创建失败 (org_code={item['org_code']}): {e}")
|
||
|
||
return success_count
|
||
|
||
def _build_data_dict(self, row):
|
||
"""
|
||
根据field_mapping构建简道云数据字典
|
||
"""
|
||
data_dict = {}
|
||
|
||
for col_name, widget_id in self.field_mapping.items():
|
||
if col_name in row.index:
|
||
value = row[col_name]
|
||
clean_value = None if pd.isna(value) else value
|
||
data_dict[widget_id] = {"value": clean_value}
|
||
|
||
return data_dict
|
||
|
||
def _init_field_mapping(self):
|
||
"""初始化字段映射关系"""
|
||
self.field_mapping = {
|
||
# 基础信息
|
||
'date_id': '_widget_1734062123065',
|
||
'date_fmt': '_widget_1734062123066',
|
||
'id_own_group': '_widget_1734062123067',
|
||
'group_name': '_widget_1734062123068',
|
||
'id_own_org': '_widget_1734062123069',
|
||
'org_name': '_widget_1734062123070',
|
||
'org_code': '_widget_1734062123071',
|
||
'group_grade': '_widget_1734062123072',
|
||
'org_type': '_widget_1734062123073',
|
||
'org_status': '_widget_1734062123074',
|
||
|
||
# SaaS相关
|
||
'saas_version': '_widget_1734062123075',
|
||
'is_wechat': '_widget_1734062123076',
|
||
'is_mini_app': '_widget_1734062123077',
|
||
'is_wx_shop': '_widget_1734062123078',
|
||
'is_camera_service': '_widget_1734062123079',
|
||
'is_maintenance_service': '_widget_1734062123080',
|
||
'saas_create_time': '_widget_1734062123081',
|
||
'expiry_time': '_widget_1734062123177', # 改过期日
|
||
'saas_use_days': '_widget_1734062123083',
|
||
'saas_use_year': '_widget_1734062123084',
|
||
'is_main_org': '_widget_1734062123085',
|
||
|
||
# 证照信息
|
||
'license_code': '_widget_1734062123086',
|
||
'license_name': '_widget_1734062123087',
|
||
'org_crm_id': '_widget_1734062123088',
|
||
|
||
# 地理信息
|
||
'province_id': '_widget_1734062123089',
|
||
'province_name': '_widget_1734062123090',
|
||
'city_id': '_widget_1734062123091',
|
||
'city_name': '_widget_1734062123092',
|
||
'area_id': '_widget_1734062123093',
|
||
'area_name': '_widget_1734062123094',
|
||
'region_name': '_widget_1734062123095',
|
||
'region_short_name': '_widget_1734062123096',
|
||
'branch_name': '_widget_1734062123097',
|
||
|
||
# 车主邦相关
|
||
'carzone_store_id': '_widget_1734062123098',
|
||
'carzone_store_name': '_widget_1734062123099',
|
||
'customer_carzone_id': '_widget_1734062123100',
|
||
|
||
# 人员信息
|
||
'salesmen': '_widget_1734062123101',
|
||
'area_manager': '_widget_1734062123102',
|
||
'service_salesmen': '_widget_1734062123103',
|
||
'impl_principal': '_widget_1734062123104',
|
||
'service_impl_principal': '_widget_1734062123105',
|
||
|
||
# 用户统计
|
||
'active_user_count': '_widget_1734062123106',
|
||
'active_user_type': '_widget_1734062123107',
|
||
'limit_user_count': '_widget_1734062123108',
|
||
'limit_user_type': '_widget_1734062123109',
|
||
|
||
# NGV标记
|
||
'is_n': '_widget_1734062123110',
|
||
'is_g': '_widget_1734062123111',
|
||
'is_v': '_widget_1734062123112',
|
||
'is_visited': '_widget_1734062123113',
|
||
'is_active': '_widget_1734062123114',
|
||
'active_status_fmt': '_widget_1734062123115',
|
||
|
||
# 单据统计
|
||
'bill_count_last_30_day': '_widget_1734062123116',
|
||
'bill_day_count_last_30_day': '_widget_1734062123117',
|
||
'bill_day_count_this_month': '_widget_1734062123118',
|
||
'bill_count_last_7_day': '_widget_1734062123119',
|
||
'bill_day_count_last_7_day': '_widget_1734062123120',
|
||
'pv_count': '_widget_1734062123121',
|
||
'uv_count': '_widget_1734062123122',
|
||
|
||
# 每日单据数(1-31天)
|
||
'bill_count_1d': '_widget_1734062123123',
|
||
'bill_count_2d': '_widget_1734062123124',
|
||
'bill_count_3d': '_widget_1734062123125',
|
||
'bill_count_4d': '_widget_1734062123126',
|
||
'bill_count_5d': '_widget_1734062123127',
|
||
'bill_count_6d': '_widget_1734062123128',
|
||
'bill_count_7d': '_widget_1734062123129',
|
||
'bill_count_8d': '_widget_1734062123130',
|
||
'bill_count_9d': '_widget_1734062123131',
|
||
'bill_count_10d': '_widget_1734062123132',
|
||
'bill_count_11d': '_widget_1734062123133',
|
||
'bill_count_12d': '_widget_1734062123134',
|
||
'bill_count_13d': '_widget_1734062123135',
|
||
'bill_count_14d': '_widget_1734062123136',
|
||
'bill_count_15d': '_widget_1734062123137',
|
||
'bill_count_16d': '_widget_1734062123138',
|
||
'bill_count_17d': '_widget_1734062123139',
|
||
'bill_count_18d': '_widget_1734062123140',
|
||
'bill_count_19d': '_widget_1734062123141',
|
||
'bill_count_20d': '_widget_1734062123142',
|
||
'bill_count_21d': '_widget_1734062123143',
|
||
'bill_count_22d': '_widget_1734062123144',
|
||
'bill_count_23d': '_widget_1734062123145',
|
||
'bill_count_24d': '_widget_1734062123146',
|
||
'bill_count_25d': '_widget_1734062123147',
|
||
'bill_count_26d': '_widget_1734062123148',
|
||
'bill_count_27d': '_widget_1734062123149',
|
||
'bill_count_28d': '_widget_1734062123150',
|
||
'bill_count_29d': '_widget_1734062123151',
|
||
'bill_count_30d': '_widget_1734062123152',
|
||
'bill_count_31d': '_widget_1734062123153',
|
||
|
||
# ETL时间
|
||
'etl_time': '_widget_1734062123154',
|
||
|
||
# 业务类型统计
|
||
'maintain_bill_count_last_30_day': '_widget_1734062123155',
|
||
'washing_bill_count_last_30_day': '_widget_1734062123156',
|
||
'maintain_bill_day_count_last_30_day': '_widget_1734062123157',
|
||
'washing_bill_day_count_last_30_day': '_widget_1734062123158',
|
||
'retail_bill_count_last_30_day': '_widget_1734062123159',
|
||
'retail_bill_day_count_last_30_day': '_widget_1734062123160',
|
||
'purchase_bill_count_last_30_day': '_widget_1734062123161',
|
||
'purchase_bill_day_count_last_30_day': '_widget_1734062123162',
|
||
'card_bill_count_last_30_day': '_widget_1734062123163',
|
||
'card_bill_day_count_last_30_day': '_widget_1734062123164',
|
||
'gd_sales_bill_count_last_30_day': '_widget_1734062123165',
|
||
'gd_sales_bill_day_count_last_30_day': '_widget_1734062123166',
|
||
|
||
# G标记相关
|
||
'g_change_flag': '_widget_1734062123167',
|
||
'saas_package': '_widget_1734062123168',
|
||
'manage_model': '_widget_1734062123169',
|
||
|
||
# 联系信息
|
||
'contacts': '_widget_1734062123170',
|
||
'contact_number': '_widget_1734062123171',
|
||
'contact_mobile': '_widget_1734062123172',
|
||
|
||
# G月度统计
|
||
'g_month_count': '_widget_1734062123173',
|
||
'g_month_percentage': '_widget_1734062123174',
|
||
|
||
# 安装服务
|
||
'is_install_service': '_widget_1734062123175',
|
||
'install_create_time': '_widget_1734062123176',
|
||
'last_end_date': '_widget_1734062123082',
|
||
'renew_date': '_widget_1734062123178',
|
||
|
||
# 连锁信息
|
||
'is_chain_owner': '_widget_1734062123179',
|
||
'group_org_count': '_widget_1734062123180',
|
||
|
||
# 预警信息
|
||
'recent_bill_warning_days': '_widget_1734062123181',
|
||
'g_change_flag_d': '_widget_1734062123182',
|
||
'g_lost_warning_days': '_widget_1734062123183',
|
||
|
||
# SaaS版本
|
||
'saas_edition_fmt': '_widget_1734062123184',
|
||
|
||
# G标记月度(1-6月)
|
||
'g_flag_1m': '_widget_1734062123185',
|
||
'g_flag_2m': '_widget_1734062123186',
|
||
'g_flag_3m': '_widget_1734062123187',
|
||
'g_flag_4m': '_widget_1734062123188',
|
||
'g_flag_5m': '_widget_1734062123189',
|
||
'g_flag_6m': '_widget_1734062123190',
|
||
'g_flag_day_count': '_widget_1734062123191',
|
||
|
||
# 其他标记
|
||
'add_org_flag': '_widget_1734062123192',
|
||
'pt': '_widget_1734062123193',
|
||
|
||
# 门店属性
|
||
'org_size': '_widget_1734062123194',
|
||
'qualification_type_fmt': '_widget_1734062123195',
|
||
'business_scope_fmt': '_widget_1734062123196',
|
||
'store_type_fmt': '_widget_1734062123197',
|
||
'area': '_widget_1734062123198',
|
||
'station_number': '_widget_1734062123199',
|
||
'header_type_fmt': '_widget_1734062123200',
|
||
'org_stage': '_widget_1734062123201',
|
||
|
||
# 本月统计
|
||
'g_count_this_month': '_widget_1734062123202',
|
||
'saas_customer_type': '_widget_1734062123203',
|
||
'technician': '_widget_1734062123204',
|
||
'tmall_maintain_service_status_desc': '_widget_1734062123205',
|
||
|
||
# 日期字段(UTC格式)
|
||
'date_fmt_date': '_widget_1749000071375',
|
||
'saas_create_time_date': '_widget_1749000071377',
|
||
'expiry_time_date': '_widget_1749000071389', # 过期日
|
||
'install_create_time_date': '_widget_1749000071384',
|
||
'last_end_date_date': '_widget_1749000071382',
|
||
'renew_date_date': '_widget_1749000071391',
|
||
|
||
# 人员ID字段
|
||
'area_manager_staff_id': '_widget_1748496855779',
|
||
'service_salesmen_staff_id': '_widget_1748496855778',
|
||
'service_impl_principal_staff_id': '_widget_1748496855780',
|
||
'technician_staff_id': '_widget_1751877712235',
|
||
}
|
||
|
||
def _save_to_cache(self, df, cache_key):
|
||
"""
|
||
保存DataFrame到本地缓存
|
||
|
||
参数:
|
||
df: 要保存的DataFrame
|
||
cache_key: 缓存键名(对应CACHE_FILES中的键)
|
||
"""
|
||
if cache_key not in CACHE_FILES:
|
||
logger.warning(f" ⚠ 未知的缓存键: {cache_key}")
|
||
return
|
||
|
||
cache_file = CACHE_FILES[cache_key]
|
||
try:
|
||
df.to_csv(cache_file, index=False, encoding='utf-8-sig')
|
||
logger.info(f" ✓ 已保存到缓存: {cache_file} ({len(df)} 条记录)")
|
||
except Exception as e:
|
||
logger.warning(f" ⚠ 保存缓存失败 ({cache_key}): {e}")
|
||
|
||
def _load_from_cache(self, cache_key):
|
||
"""
|
||
从本地缓存加载DataFrame
|
||
|
||
参数:
|
||
cache_key: 缓存键名(对应CACHE_FILES中的键)
|
||
|
||
返回:
|
||
DataFrame或空DataFrame(如果缓存不存在)
|
||
"""
|
||
if cache_key not in CACHE_FILES:
|
||
logger.warning(f" ⚠ 未知的缓存键: {cache_key}")
|
||
return pd.DataFrame()
|
||
|
||
cache_file = CACHE_FILES[cache_key]
|
||
|
||
if not os.path.exists(cache_file):
|
||
logger.warning(f" ⚠ 缓存文件不存在: {cache_file}")
|
||
logger.warning(f" 请先关闭USE_LOCAL_CACHE运行一次以生成缓存文件")
|
||
return pd.DataFrame()
|
||
|
||
try:
|
||
df = pd.read_csv(cache_file, encoding='utf-8-sig')
|
||
logger.info(f" ✓ 已从缓存加载: {cache_file} ({len(df)} 条记录)")
|
||
return df
|
||
except Exception as e:
|
||
logger.error(f" ✗ 加载缓存失败 ({cache_key}): {e}")
|
||
return pd.DataFrame()
|
||
|
||
def _save_create_data(self, create_data_list):
|
||
"""
|
||
保存新增数据到文件
|
||
|
||
参数:
|
||
create_data_list: 新增数据列表
|
||
"""
|
||
if len(create_data_list) == 0:
|
||
return
|
||
|
||
try:
|
||
# 生成时间戳
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
||
# 提取数据到DataFrame
|
||
create_records = []
|
||
for item in create_data_list:
|
||
row_data = item['row_data']
|
||
record = {
|
||
'org_code': item['org_code'],
|
||
'org_name': row_data.get('org_name', ''),
|
||
'group_name': row_data.get('group_name', ''),
|
||
'org_type': row_data.get('org_type', ''),
|
||
'province_name': row_data.get('province_name', ''),
|
||
'city_name': row_data.get('city_name', ''),
|
||
'saas_version': row_data.get('saas_version', ''),
|
||
'active_status_fmt': row_data.get('active_status_fmt', ''),
|
||
}
|
||
create_records.append(record)
|
||
|
||
create_df = pd.DataFrame(create_records)
|
||
|
||
# 使用相对路径保存(支持跨平台)
|
||
file_path = os.path.join(output_dir, f'新增门店_{timestamp}.csv')
|
||
create_df.to_csv(file_path, index=False, encoding='utf-8-sig')
|
||
|
||
logger.info(f" ✓ 新增数据已保存: {file_path} ({len(create_df)} 条)")
|
||
except Exception as e:
|
||
error_task_logger.error(f"保存新增数据失败: {e}", exc_info=True)
|
||
|
||
def _save_update_stats(self, update_data_list):
|
||
"""
|
||
保存更新统计信息到文件
|
||
|
||
参数:
|
||
update_data_list: 更新数据列表
|
||
"""
|
||
if len(update_data_list) == 0:
|
||
return
|
||
|
||
try:
|
||
# 生成时间戳
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
||
# 统计每个org_code的更新记录数(去重)
|
||
org_code_counts = {}
|
||
org_code_info = {}
|
||
|
||
for item in update_data_list:
|
||
org_code = item['org_code']
|
||
if org_code not in org_code_counts:
|
||
org_code_counts[org_code] = 0
|
||
row_data = item['row_data']
|
||
org_code_info[org_code] = {
|
||
'org_name': row_data.get('org_name', ''),
|
||
'group_name': row_data.get('group_name', ''),
|
||
'org_type': row_data.get('org_type', ''),
|
||
'province_name': row_data.get('province_name', ''),
|
||
'city_name': row_data.get('city_name', ''),
|
||
'saas_version': row_data.get('saas_version', ''),
|
||
'active_status_fmt': row_data.get('active_status_fmt', ''),
|
||
}
|
||
org_code_counts[org_code] += 1
|
||
|
||
# 构建统计DataFrame
|
||
update_stats = []
|
||
for org_code, count in org_code_counts.items():
|
||
info = org_code_info[org_code]
|
||
stat = {
|
||
'org_code': org_code,
|
||
'org_name': info['org_name'],
|
||
'group_name': info['group_name'],
|
||
'org_type': info['org_type'],
|
||
'province_name': info['province_name'],
|
||
'city_name': info['city_name'],
|
||
'saas_version': info['saas_version'],
|
||
'active_status_fmt': info['active_status_fmt'],
|
||
'update_count': count,
|
||
'note': '同一org_code有多个记录' if count > 1 else ''
|
||
}
|
||
update_stats.append(stat)
|
||
|
||
update_df = pd.DataFrame(update_stats)
|
||
update_df = update_df.sort_values('update_count', ascending=False)
|
||
|
||
# 使用相对路径保存(支持跨平台)
|
||
file_path = os.path.join(output_dir, f'更新统计_{timestamp}.csv')
|
||
update_df.to_csv(file_path, index=False, encoding='utf-8-sig')
|
||
|
||
# 统计汇总
|
||
total_org_codes = len(org_code_counts)
|
||
total_records = len(update_data_list)
|
||
duplicate_org_codes = sum(1 for count in org_code_counts.values() if count > 1)
|
||
|
||
logger.info(f" ✓ 更新统计已保存: {file_path}")
|
||
logger.info(f" - 更新的org_code数: {total_org_codes}")
|
||
logger.info(f" - 更新的记录总数: {total_records}")
|
||
if duplicate_org_codes > 0:
|
||
logger.info(f" - 包含多条记录的org_code: {duplicate_org_codes}")
|
||
except Exception as e:
|
||
error_task_logger.error(f"保存更新统计失败: {e}", exc_info=True)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
updater = UpdateAllNGVDataDaily()
|
||
updater.main()
|