Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1d5bf7cd55 | |||
| e4e4d04e3e |
@@ -113,7 +113,7 @@ class UpdateNGVData:
|
||||
pass
|
||||
|
||||
#
|
||||
data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data}
|
||||
data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data,"is_start_trigger":"true"}
|
||||
|
||||
result = api_instance.entry_data_batch_create(data)
|
||||
logger.info(f"数据已推送:{result}")
|
||||
|
||||
@@ -66,6 +66,7 @@ import os
|
||||
import sys
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import time
|
||||
import numpy as np
|
||||
|
||||
# 添加父目录到Python路径,以便导入项目模块
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
@@ -153,7 +154,7 @@ class UpdateAllNGVDataDaily:
|
||||
jdy_ngv_data, staff_id_map = self._load_base_data()
|
||||
|
||||
# 步骤2: 获取并处理NGV源数据
|
||||
ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data()
|
||||
ngv_data_today, ngv_data_yesterday = self._load_ngv_source_data(task_start_time)
|
||||
|
||||
# 步骤3: 处理已删除的门店
|
||||
self._handle_deleted_stores(jdy_ngv_data, ngv_data_today)
|
||||
@@ -238,7 +239,7 @@ class UpdateAllNGVDataDaily:
|
||||
|
||||
return jdy_ngv_data, staff_id_map
|
||||
|
||||
def _load_ngv_source_data(self):
|
||||
def _load_ngv_source_data(self, task_start_time):
|
||||
"""
|
||||
步骤2: 获取并处理NGV源数据
|
||||
返回: (昨天的数据, 前天的数据)
|
||||
@@ -262,6 +263,10 @@ class UpdateAllNGVDataDaily:
|
||||
ngv_data_1 = common_module.get_ngv_details(days_back=1)
|
||||
ngv_data_2 = common_module.get_ngv_details(days_back=2)
|
||||
|
||||
# 存储每天获取到的数据
|
||||
ngv_data_1.to_csv(f"{task_start_time}_ngv_data_today.csv", index=False)
|
||||
ngv_data_2.to_csv(f"{task_start_time}_ngv_data_yesterday.csv", index=False)
|
||||
|
||||
# 只保留 org_type 为 "一般" 的记录
|
||||
ngv_data_1 = ngv_data_1[ngv_data_1['org_type'] == '一般']
|
||||
ngv_data_2 = ngv_data_2[ngv_data_2['org_type'] == '一般']
|
||||
@@ -616,6 +621,17 @@ class UpdateAllNGVDataDaily:
|
||||
org_code = idx
|
||||
data_dict[self.ORG_CODE_WIDGET_ID] = {"value": org_code}
|
||||
|
||||
# 新增:仅创建内容非全空的记录
|
||||
# 检查除了门店编码外,其他字段是否全为空
|
||||
all_empty = True
|
||||
for k, v in data_dict.items():
|
||||
if k != self.ORG_CODE_WIDGET_ID and v.get('value') not in (None, '', float('nan')):
|
||||
all_empty = False
|
||||
break
|
||||
if all_empty:
|
||||
logger.info(f" - 跳过内容全空的新建记录 org_code: {idx}")
|
||||
continue
|
||||
|
||||
create_data_list.append({
|
||||
'org_code': idx,
|
||||
'data_dict': data_dict,
|
||||
@@ -782,7 +798,9 @@ class UpdateAllNGVDataDaily:
|
||||
create_data = {
|
||||
'api_key': Config.SaaS_Tasks_APP_ID,
|
||||
'entry_id': Config.NGV_TASKS_ENTRY_ID,
|
||||
'data': item['data_dict']
|
||||
'data': item['data_dict'],
|
||||
'is_start_trigger': 'true',
|
||||
|
||||
}
|
||||
api_instance.data_batch_create(data=create_data, max_retries=20)
|
||||
success_count += 1
|
||||
@@ -1071,7 +1089,7 @@ class UpdateAllNGVDataDaily:
|
||||
try:
|
||||
# 生成时间戳
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
|
||||
# 提取数据到DataFrame
|
||||
create_records = []
|
||||
for item in create_data_list:
|
||||
@@ -1087,13 +1105,13 @@ class UpdateAllNGVDataDaily:
|
||||
'active_status_fmt': row_data.get('active_status_fmt', ''),
|
||||
}
|
||||
create_records.append(record)
|
||||
|
||||
|
||||
create_df = pd.DataFrame(create_records)
|
||||
|
||||
|
||||
# 使用相对路径保存(支持跨平台)
|
||||
file_path = os.path.join(output_dir, f'新增门店_{timestamp}.csv')
|
||||
create_df.to_csv(file_path, index=False, encoding='utf-8-sig')
|
||||
|
||||
|
||||
logger.info(f" ✓ 新增数据已保存: {file_path} ({len(create_df)} 条)")
|
||||
except Exception as e:
|
||||
error_task_logger.error(f"保存新增数据失败: {e}", exc_info=True)
|
||||
@@ -1111,11 +1129,11 @@ class UpdateAllNGVDataDaily:
|
||||
try:
|
||||
# 生成时间戳
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
|
||||
# 统计每个org_code的更新记录数(去重)
|
||||
org_code_counts = {}
|
||||
org_code_info = {}
|
||||
|
||||
|
||||
for item in update_data_list:
|
||||
org_code = item['org_code']
|
||||
if org_code not in org_code_counts:
|
||||
@@ -1131,7 +1149,7 @@ class UpdateAllNGVDataDaily:
|
||||
'active_status_fmt': row_data.get('active_status_fmt', ''),
|
||||
}
|
||||
org_code_counts[org_code] += 1
|
||||
|
||||
|
||||
# 构建统计DataFrame
|
||||
update_stats = []
|
||||
for org_code, count in org_code_counts.items():
|
||||
@@ -1149,19 +1167,19 @@ class UpdateAllNGVDataDaily:
|
||||
'note': '同一org_code有多个记录' if count > 1 else ''
|
||||
}
|
||||
update_stats.append(stat)
|
||||
|
||||
|
||||
update_df = pd.DataFrame(update_stats)
|
||||
update_df = update_df.sort_values('update_count', ascending=False)
|
||||
|
||||
|
||||
# 使用相对路径保存(支持跨平台)
|
||||
file_path = os.path.join(output_dir, f'更新统计_{timestamp}.csv')
|
||||
update_df.to_csv(file_path, index=False, encoding='utf-8-sig')
|
||||
|
||||
|
||||
# 统计汇总
|
||||
total_org_codes = len(org_code_counts)
|
||||
total_records = len(update_data_list)
|
||||
duplicate_org_codes = sum(1 for count in org_code_counts.values() if count > 1)
|
||||
|
||||
|
||||
logger.info(f" ✓ 更新统计已保存: {file_path}")
|
||||
logger.info(f" - 更新的org_code数: {total_org_codes}")
|
||||
logger.info(f" - 更新的记录总数: {total_records}")
|
||||
@@ -1174,4 +1192,3 @@ class UpdateAllNGVDataDaily:
|
||||
if __name__ == '__main__':
|
||||
updater = UpdateAllNGVDataDaily()
|
||||
updater.main()
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ class EmailProcessor:
|
||||
"指标类型": "_widget_1742091963880",
|
||||
"指标值": "_widget_1742091963882",
|
||||
"指标子类型": "_widget_1742091963881",
|
||||
"门店过期时间":"_widget_1761875317680"
|
||||
}
|
||||
|
||||
def connect_email_by_pop3(self):
|
||||
@@ -288,10 +289,12 @@ class EmailProcessor:
|
||||
email_df['门店ID'] = email_df['门店ID'].astype(str)
|
||||
email_df['指标归属日期'] = pd.to_datetime(email_df['指标归属日期'], format="%Y/%m/%d").dt.strftime("%Y-%m-%d")
|
||||
email_df["门店创建时间"] = pd.to_datetime(email_df['门店创建时间'], format="%Y-%m-%d %H:%M:%S")
|
||||
email_df["门店过期时间"] = pd.to_datetime(email_df['门店过期时间'], format="%Y-%m-%d %H:%M:%S")
|
||||
new_email_df = email_df.copy() # 拷贝传参
|
||||
for index, row in email_df.iterrows():
|
||||
email_df.loc[index, '指标归属日期'] = common_module.time_to_UTC(row['指标归属日期'])
|
||||
email_df.loc[index, '门店创建时间'] = common_module.time_to_UTC(row['门店创建时间'])
|
||||
email_df.loc[index, '门店过期时间'] = common_module.time_to_UTC(row['门店过期时间'])
|
||||
|
||||
email_data = [self.row_to_dict(row, self.field_mapping) for index, row in email_df.iterrows()]
|
||||
new_email_data = {'api_key': "673457d6837e60a418e0e56b",
|
||||
@@ -361,7 +364,7 @@ class EmailProcessor:
|
||||
return
|
||||
|
||||
logger.info("邮件获取完成,开始处理数据")
|
||||
email_df = processor.update_email()
|
||||
email_df = processor.update_email() # 发送到简道云
|
||||
processor.up_to_BI(email_df) # 发送到BI
|
||||
common_module.send_task_status(task_start_time, "海外邮件推送")
|
||||
logger.info("海外邮件推送任务完成")
|
||||
|
||||
@@ -1,16 +1,11 @@
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from yd_api import YDAPI
|
||||
from api import API
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from config import Config
|
||||
from back_ground_module import CommonModule
|
||||
import logging
|
||||
from log_config import configure_task_logger, configure_error_task_logger
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
|
||||
+8
-8
@@ -257,8 +257,8 @@
|
||||
{
|
||||
"metadata": {
|
||||
"ExecuteTime": {
|
||||
"end_time": "2025-10-27T06:50:20.870562Z",
|
||||
"start_time": "2025-10-27T06:50:20.631342Z"
|
||||
"end_time": "2025-10-31T01:47:00.303760Z",
|
||||
"start_time": "2025-10-31T01:46:58.373023Z"
|
||||
}
|
||||
},
|
||||
"cell_type": "code",
|
||||
@@ -274,10 +274,10 @@
|
||||
" } # 衡时数据库链接配置-mysql\n",
|
||||
"# table_name = \"new_dealer_service_order_to_bi\" # 替换为你的实际表名\n",
|
||||
"\n",
|
||||
"table_name = \"non_standard_performance_to_BI\"\n",
|
||||
"column_name = \"业绩分类\"\n",
|
||||
"new_column_type = \"VARCHAR(255)\" # 目标数据类型\n",
|
||||
"# new_column_type = \"DATETIME\" # 目标数据类型\n",
|
||||
"table_name = \"thailand_store_data_email\"\n",
|
||||
"column_name = \"门店过期时间\"\n",
|
||||
"# new_column_type = \"VARCHAR(255)\" # 目标数据类型\n",
|
||||
"new_column_type = \"DATETIME\" # 目标数据类型\n",
|
||||
"\n",
|
||||
"try:\n",
|
||||
" # 连接数据库\n",
|
||||
@@ -336,12 +336,12 @@
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"✅ 成功添加字段: `业绩分类`\n",
|
||||
"✅ 成功添加字段: `门店过期时间`\n",
|
||||
"数据库连接已关闭\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"execution_count": 5
|
||||
"execution_count": 1
|
||||
},
|
||||
{
|
||||
"metadata": {},
|
||||
|
||||
Reference in New Issue
Block a user