Compare commits

...

2 Commits

Author SHA1 Message Date
panda 1d5bf7cd55 ngv每日更新存储数据源数据 2025-11-03 11:33:08 +08:00
panda e4e4d04e3e 客户资料启用智能助手 2025-11-03 11:29:48 +08:00
5 changed files with 46 additions and 31 deletions
+1 -1
View File
@@ -113,7 +113,7 @@ class UpdateNGVData:
pass
#
data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data}
data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data,"is_start_trigger":"true"}
result = api_instance.entry_data_batch_create(data)
logger.info(f"数据已推送:{result}")
+32 -15
View File
@@ -66,6 +66,7 @@ import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import numpy as np
# 添加父目录到Python路径,以便导入项目模块
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@@ -153,7 +154,7 @@ class UpdateAllNGVDataDaily:
jdy_ngv_data, staff_id_map = self._load_base_data()
# 步骤2: 获取并处理NGV源数据
ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data()
ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data(task_start_time)
# 步骤3: 处理已删除的门店
self._handle_deleted_stores(jdy_ngv_data, ngv_data_today)
@@ -238,7 +239,7 @@ class UpdateAllNGVDataDaily:
return jdy_ngv_data, staff_id_map
def _load_ngv_source_data(self):
def _load_ngv_source_data(self, task_start_time):
"""
步骤2: 获取并处理NGV源数据
返回: (昨天的数据, 前天的数据)
@@ -262,6 +263,10 @@ class UpdateAllNGVDataDaily:
ngv_data_1 = common_module.get_ngv_details(days_back=1)
ngv_data_2 = common_module.get_ngv_details(days_back=2)
# 存储每天获取到的数据
ngv_data_1.to_csv(f"{task_start_time}_ngv_data_today.csv", index=False)
ngv_data_2.to_csv(f"{task_start_time}_ngv_data_yesterday.csv", index=False)
# 只保留 org_type 为 "一般" 的记录
ngv_data_1 = ngv_data_1[ngv_data_1['org_type'] == '一般']
ngv_data_2 = ngv_data_2[ngv_data_2['org_type'] == '一般']
@@ -616,6 +621,17 @@ class UpdateAllNGVDataDaily:
org_code = idx
data_dict[self.ORG_CODE_WIDGET_ID] = {"value": org_code}
# 新增:仅创建内容非全空的记录
# 检查除了门店编码外,其他字段是否全为空
all_empty = True
for k, v in data_dict.items():
if k != self.ORG_CODE_WIDGET_ID and v.get('value') not in (None, '', float('nan')):
all_empty = False
break
if all_empty:
logger.info(f" - 跳过内容全空的新建记录 org_code: {idx}")
continue
create_data_list.append({
'org_code': idx,
'data_dict': data_dict,
@@ -782,7 +798,9 @@ class UpdateAllNGVDataDaily:
create_data = {
'api_key': Config.SaaS_Tasks_APP_ID,
'entry_id': Config.NGV_TASKS_ENTRY_ID,
'data': item['data_dict']
'data': item['data_dict'],
'is_start_trigger': 'true',
}
api_instance.data_batch_create(data=create_data, max_retries=20)
success_count += 1
@@ -1071,7 +1089,7 @@ class UpdateAllNGVDataDaily:
try:
# 生成时间戳
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 提取数据到DataFrame
create_records = []
for item in create_data_list:
@@ -1087,13 +1105,13 @@ class UpdateAllNGVDataDaily:
'active_status_fmt': row_data.get('active_status_fmt', ''),
}
create_records.append(record)
create_df = pd.DataFrame(create_records)
# 使用相对路径保存(支持跨平台)
file_path = os.path.join(output_dir, f'新增门店_{timestamp}.csv')
create_df.to_csv(file_path, index=False, encoding='utf-8-sig')
logger.info(f" ✓ 新增数据已保存: {file_path} ({len(create_df)} 条)")
except Exception as e:
error_task_logger.error(f"保存新增数据失败: {e}", exc_info=True)
@@ -1111,11 +1129,11 @@ class UpdateAllNGVDataDaily:
try:
# 生成时间戳
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# 统计每个org_code的更新记录数(去重)
org_code_counts = {}
org_code_info = {}
for item in update_data_list:
org_code = item['org_code']
if org_code not in org_code_counts:
@@ -1131,7 +1149,7 @@ class UpdateAllNGVDataDaily:
'active_status_fmt': row_data.get('active_status_fmt', ''),
}
org_code_counts[org_code] += 1
# 构建统计DataFrame
update_stats = []
for org_code, count in org_code_counts.items():
@@ -1149,19 +1167,19 @@ class UpdateAllNGVDataDaily:
'note': '同一org_code有多个记录' if count > 1 else ''
}
update_stats.append(stat)
update_df = pd.DataFrame(update_stats)
update_df = update_df.sort_values('update_count', ascending=False)
# 使用相对路径保存(支持跨平台)
file_path = os.path.join(output_dir, f'更新统计_{timestamp}.csv')
update_df.to_csv(file_path, index=False, encoding='utf-8-sig')
# 统计汇总
total_org_codes = len(org_code_counts)
total_records = len(update_data_list)
duplicate_org_codes = sum(1 for count in org_code_counts.values() if count > 1)
logger.info(f" ✓ 更新统计已保存: {file_path}")
logger.info(f" - 更新的org_code数: {total_org_codes}")
logger.info(f" - 更新的记录总数: {total_records}")
@@ -1174,4 +1192,3 @@ class UpdateAllNGVDataDaily:
if __name__ == '__main__':
updater = UpdateAllNGVDataDaily()
updater.main()
@@ -48,6 +48,7 @@ class EmailProcessor:
"指标类型": "_widget_1742091963880",
"指标值": "_widget_1742091963882",
"指标子类型": "_widget_1742091963881",
"门店过期时间":"_widget_1761875317680"
}
def connect_email_by_pop3(self):
@@ -288,10 +289,12 @@ class EmailProcessor:
email_df['门店ID'] = email_df['门店ID'].astype(str)
email_df['指标归属日期'] = pd.to_datetime(email_df['指标归属日期'], format="%Y/%m/%d").dt.strftime("%Y-%m-%d")
email_df["门店创建时间"] = pd.to_datetime(email_df['门店创建时间'], format="%Y-%m-%d %H:%M:%S")
email_df["门店过期时间"] = pd.to_datetime(email_df['门店过期时间'], format="%Y-%m-%d %H:%M:%S")
new_email_df = email_df.copy() # 拷贝传参
for index, row in email_df.iterrows():
email_df.loc[index, '指标归属日期'] = common_module.time_to_UTC(row['指标归属日期'])
email_df.loc[index, '门店创建时间'] = common_module.time_to_UTC(row['门店创建时间'])
email_df.loc[index, '门店过期时间'] = common_module.time_to_UTC(row['门店过期时间'])
email_data = [self.row_to_dict(row, self.field_mapping) for index, row in email_df.iterrows()]
new_email_data = {'api_key': "673457d6837e60a418e0e56b",
@@ -361,7 +364,7 @@ class EmailProcessor:
return
logger.info("邮件获取完成,开始处理数据")
email_df = processor.update_email()
email_df = processor.update_email() # 发送到简道云
processor.up_to_BI(email_df) # 发送到BI
common_module.send_task_status(task_start_time, "海外邮件推送")
logger.info("海外邮件推送任务完成")
@@ -1,16 +1,11 @@
import mysql.connector
from mysql.connector import Error
import numpy as np
import pandas as pd
from yd_api import YDAPI
from api import API
import pandas as pd
from tqdm import tqdm
import time
from datetime import datetime, timedelta
from datetime import datetime
from config import Config
from back_ground_module import CommonModule
import logging
from log_config import configure_task_logger, configure_error_task_logger
import mysql.connector
from mysql.connector import Error
+8 -8
View File
@@ -257,8 +257,8 @@
{
"metadata": {
"ExecuteTime": {
"end_time": "2025-10-27T06:50:20.870562Z",
"start_time": "2025-10-27T06:50:20.631342Z"
"end_time": "2025-10-31T01:47:00.303760Z",
"start_time": "2025-10-31T01:46:58.373023Z"
}
},
"cell_type": "code",
@@ -274,10 +274,10 @@
" } # 衡时数据库链接配置-mysql\n",
"# table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n",
"\n",
"table_name = \"non_standard_performance_to_BI\"\n",
"column_name = \"业绩分类\"\n",
"new_column_type = \"VARCHAR(255)\" # 目标数据类型\n",
"# new_column_type = \"DATETIME\" # 目标数据类型\n",
"table_name = \"thailand_store_data_email\"\n",
"column_name = \"门店过期时间\"\n",
"# new_column_type = \"VARCHAR(255)\" # 目标数据类型\n",
"new_column_type = \"DATETIME\" # 目标数据类型\n",
"\n",
"try:\n",
" # 连接数据库\n",
@@ -336,12 +336,12 @@
"name": "stdout",
"output_type": "stream",
"text": [
"✅ 成功添加字段: `业绩分类`\n",
"✅ 成功添加字段: `门店过期时间`\n",
"数据库连接已关闭\n"
]
}
],
"execution_count": 5
"execution_count": 1
},
{
"metadata": {},