Files

267 lines
17 KiB
Python

# -*- coding: utf-8 -*-
import pandas as pd
import datetime
from config import Config
from api import API
from back_ground_module import CommonModule
from log_config import configure_task_logger, configure_error_task_logger
import time
logger = configure_task_logger()
# 获取已经配置好的错误任务日志记录器
error_task_logger = configure_error_task_logger()
start_time = datetime.datetime.now()
api_instance = API()
common_module = CommonModule()
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
import os
os.makedirs(output_dir, exist_ok=True)
class UpdateNGVData:
"""NGV数据每日新增"""
def __init__(self):
self.staff_id_list = None
self.field_mapping = {}
self.fields()
def load_all_data(self):
# 获取简道云员工id
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
}
staff_id = api_instance.entry_data_list(payload)
self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
@staticmethod
def get_staff_id(row_item, name):
"""辅助函数,用于获取员工ID"""
if str(row_item["_widget_1734942794144"]) == str(name): # 检查姓名是否匹配
return row_item["_widget_1734942794145"] # 返回员工ID
return None
def main(self):
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
self.load_all_data()
logger.info(f"数据加载完成")
data_NGV_j = common_module.get_ngv_details(days_back=1)
data_NGV_j1 = common_module.get_ngv_details(days_back=2)
timestamp = time.time()
data_NGV_j.to_csv(os.path.join(output_dir, f"up_NGV_j.csv"))
data_NGV_j1.to_csv(os.path.join(output_dir, f"up_NGV_j1.csv"))
# 找出在 data_NGV_j 中存在但在 data_NGV_j1 中不存在的 data_id
unique_data_ids = data_NGV_j[~data_NGV_j['org_code'].isin(data_NGV_j1['org_code'])]
# 创建一个新的 DataFrame 保存这些唯一的 data_id 及其对应的数据
new_df = data_NGV_j[data_NGV_j['org_code'].isin(unique_data_ids['org_code'])]
# 对 new_df 进行进一步的过滤,只保留 org_type 为 "一般" 的记录
data_NGV_j = data_NGV_j[data_NGV_j['org_type'] == '一般']
data_NGV_j1 = data_NGV_j1[data_NGV_j1['org_type'] == '一般']
filtered_df = new_df[new_df['org_type'] == '一般']
filtered_df = filtered_df.copy()
# 默认未删除
filtered_df['源NGV是否已删除'] = '未删除'
# 日期字段转换为日期格式
time_columns = ['date_fmt', 'saas_create_time', 'expiry_time', 'install_create_time', "last_end_date",
"renew_date"]
new_filtered_df = filtered_df.copy() # 复制df,以调整时间
for col in time_columns:
# 1. 转换为datetime类型(带错误处理)
# 使用.loc安全赋值
new_filtered_df[col] = pd.to_datetime(filtered_df[col], errors='coerce', utc=False)
# 2. 优化后的时区转换(高效向量化操作)
filtered_df[col + '_date'] = (
new_filtered_df[col]
# 本地化为北京时间(东八区)
.dt.tz_localize('Asia/Shanghai', ambiguous='infer', nonexistent='NaT')
# 转换为UTC时区
.dt.tz_convert('UTC')
# 格式化为ISO8601字符串
.dt.strftime('%Y-%m-%dT%H:%M:%SZ')
)
logger.info(f"时间转换完成")
# 人员字段转换为人员字段
staff_columns = ['area_manager', 'service_impl_principal', "service_salesmen", "technician"]
# 将员工列表转为DataFrame
# 三重循环临时方案(确保可写入)
for col in staff_columns:
staff_ids = []
for _, row in filtered_df.iterrows():
matched = False
for staff in self.staff_id_list:
if str(staff['_widget_1734942794144']) == str(row[col]):
staff_ids.append(staff['_widget_1734942794145'])
matched = True
break
if not matched:
staff_ids.append(None)
filtered_df[col + "_staff_id"] = staff_ids
logger.info(f"人员转换完成")
# 数字保留3位小数
filtered_df['g_month_percentage'] = (
pd.to_numeric(filtered_df['g_month_percentage'], errors='coerce')
.round(3)
.apply(lambda x: f"{x:.3f}" if pd.notna(x) else '')
)
# filtered_df.to_csv(r"D:\Idea Project\SaaS_V1.3\back_ground_module\output\NGV.csv")
# 生成包含所有行转换后的字典列表
# all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j1.iterrows()] # 前两天的全部数据
# all_data = [self.row_to_dict(row, self.field_mapping) for index, row in data_NGV_j.iterrows()] # 前一天的全部数据
all_data = [self.row_to_dict(row, self.field_mapping) for index, row in filtered_df.iterrows()] # 增量数据
# try:
# filtered_df.to_csv(os.path.join(output_dir, f"{timestamp}NGV.csv"))
# except Exception as e:
# error_task_logger.error(f"NGV过滤后数据保存异常: {e}")
# pass
#
data = {'api_key': Config.SaaS_Tasks_APP_ID, 'entry_id': Config.NGV_TASKS_ENTRY_ID, "data_list": all_data,
"is_start_trigger": "true"}
result = api_instance.entry_data_batch_create(data)
logger.info(f"数据已推送:{result}")
# result_str = str(result)
# print(result_str[:500])
# 保存到Excel文件
# output_path = r'D:\Idea Project\SaaS_V1.7\back_ground_module\output\ngv明细1.xlsx'
# filtered_df.to_excel(output_path, index=False)
# data_NGV_j1.to_excel( r'D:\Idea Project\SaaS_V1.7\back_ground_module\output\ngv明细j1.xlsx', index=False)
# data_NGV_j.to_excel( r'D:\Idea Project\SaaS_V1.7\back_ground_module\output\ngv明细j.xlsx', index=False)
# new_df.to_excel(r'D:\Idea Project\SaaS_V1.7\back_ground_module\output\ngv明细ndf.xlsx', index=False)
common_module.send_task_status(task_start_time, "NGV新增数据")
logger.info(f"任务完成。")
except Exception as e:
error_task_logger.error(f"任务执行时发生异常: {e}")
common_module.send_task_error(task_start_time, "NGV新增数据", str(e))
# pass
@staticmethod
def row_to_dict(row, field_mapping):
"""将一行数据转换为指定格式的字典"""
result = {}
for col_name, widget_id in field_mapping.items():
if col_name in row:
value = row[col_name]
clean_value = None if pd.isna(value) else value
result[widget_id] = {"value": clean_value}
return result
def fields(self):
self.field_mapping = dict(date_id='_widget_1734062123065', date_fmt='_widget_1734062123066',
id_own_group='_widget_1734062123067', group_name='_widget_1734062123068',
id_own_org='_widget_1734062123069', org_name='_widget_1734062123070',
org_code='_widget_1734062123071', group_grade='_widget_1734062123072',
org_type='_widget_1734062123073', org_status='_widget_1734062123074',
saas_version='_widget_1734062123075', is_wechat='_widget_1734062123076',
is_mini_app='_widget_1734062123077', is_wx_shop='_widget_1734062123078',
is_camera_service='_widget_1734062123079',
is_maintenance_service='_widget_1734062123080',
saas_create_time='_widget_1734062123081', expiry_time='_widget_1734062123082',
saas_use_days='_widget_1734062123083', saas_use_year='_widget_1734062123084',
is_main_org='_widget_1734062123085', license_code='_widget_1734062123086',
license_name='_widget_1734062123087', org_crm_id='_widget_1734062123088',
province_id='_widget_1734062123089', province_name='_widget_1734062123090',
city_id='_widget_1734062123091', city_name='_widget_1734062123092',
area_id='_widget_1734062123093', area_name='_widget_1734062123094',
region_name='_widget_1734062123095', region_short_name='_widget_1734062123096',
branch_name='_widget_1734062123097', carzone_store_id='_widget_1734062123098',
carzone_store_name='_widget_1734062123099',
customer_carzone_id='_widget_1734062123100', salesmen='_widget_1734062123101',
area_manager='_widget_1734062123102', service_salesmen='_widget_1734062123103',
impl_principal='_widget_1734062123104',
service_impl_principal='_widget_1734062123105',
active_user_count='_widget_1734062123106', active_user_type='_widget_1734062123107',
limit_user_count='_widget_1734062123108', limit_user_type='_widget_1734062123109',
is_n='_widget_1734062123110', is_g='_widget_1734062123111',
is_v='_widget_1734062123112', is_visited='_widget_1734062123113',
is_active='_widget_1734062123114', active_status_fmt='_widget_1734062123115',
bill_count_last_30_day='_widget_1734062123116',
bill_day_count_last_30_day='_widget_1734062123117',
bill_day_count_this_month='_widget_1734062123118',
bill_count_last_7_day='_widget_1734062123119',
bill_day_count_last_7_day='_widget_1734062123120', pv_count='_widget_1734062123121',
uv_count='_widget_1734062123122', bill_count_1d='_widget_1734062123123',
bill_count_2d='_widget_1734062123124', bill_count_3d='_widget_1734062123125',
bill_count_4d='_widget_1734062123126', bill_count_5d='_widget_1734062123127',
bill_count_6d='_widget_1734062123128', bill_count_7d='_widget_1734062123129',
bill_count_8d='_widget_1734062123130', bill_count_9d='_widget_1734062123131',
bill_count_10d='_widget_1734062123132', bill_count_11d='_widget_1734062123133',
bill_count_12d='_widget_1734062123134', bill_count_13d='_widget_1734062123135',
bill_count_14d='_widget_1734062123136', bill_count_15d='_widget_1734062123137',
bill_count_16d='_widget_1734062123138', bill_count_17d='_widget_1734062123139',
bill_count_18d='_widget_1734062123140', bill_count_19d='_widget_1734062123141',
bill_count_20d='_widget_1734062123142', bill_count_21d='_widget_1734062123143',
bill_count_22d='_widget_1734062123144', bill_count_23d='_widget_1734062123145',
bill_count_24d='_widget_1734062123146', bill_count_25d='_widget_1734062123147',
bill_count_26d='_widget_1734062123148', bill_count_27d='_widget_1734062123149',
bill_count_28d='_widget_1734062123150', bill_count_29d='_widget_1734062123151',
bill_count_30d='_widget_1734062123152', bill_count_31d='_widget_1734062123153',
etl_time='_widget_1734062123154',
maintain_bill_count_last_30_day='_widget_1734062123155',
washing_bill_count_last_30_day='_widget_1734062123156',
maintain_bill_day_count_last_30_day='_widget_1734062123157',
washing_bill_day_count_last_30_day='_widget_1734062123158',
retail_bill_count_last_30_day='_widget_1734062123159',
retail_bill_day_count_last_30_day='_widget_1734062123160',
purchase_bill_count_last_30_day='_widget_1734062123161',
purchase_bill_day_count_last_30_day='_widget_1734062123162',
card_bill_count_last_30_day='_widget_1734062123163',
card_bill_day_count_last_30_day='_widget_1734062123164',
gd_sales_bill_count_last_30_day='_widget_1734062123165',
gd_sales_bill_day_count_last_30_day='_widget_1734062123166',
g_change_flag='_widget_1734062123167', saas_package='_widget_1734062123168',
manage_model='_widget_1734062123169', contacts='_widget_1734062123170',
contact_number='_widget_1734062123171', contact_mobile='_widget_1734062123172',
g_month_count='_widget_1734062123173', g_month_percentage='_widget_1734062123174',
is_install_service='_widget_1734062123175',
install_create_time='_widget_1734062123176', last_end_date='_widget_1734062123177',
renew_date='_widget_1734062123178', is_chain_owner='_widget_1734062123179',
group_org_count='_widget_1734062123180',
recent_bill_warning_days='_widget_1734062123181',
g_change_flag_d='_widget_1734062123182', g_lost_warning_days='_widget_1734062123183',
saas_edition_fmt='_widget_1734062123184', g_flag_1m='_widget_1734062123185',
g_flag_2m='_widget_1734062123186', g_flag_3m='_widget_1734062123187',
g_flag_4m='_widget_1734062123188', g_flag_5m='_widget_1734062123189',
g_flag_6m='_widget_1734062123190', g_flag_day_count='_widget_1734062123191',
add_org_flag='_widget_1734062123192', pt='_widget_1734062123193',
org_size='_widget_1734062123194', qualification_type_fmt='_widget_1734062123195',
business_scope_fmt='_widget_1734062123196', store_type_fmt='_widget_1734062123197',
area='_widget_1734062123198', station_number='_widget_1734062123199',
header_type_fmt='_widget_1734062123200', org_stage='_widget_1734062123201',
g_count_this_month='_widget_1734062123202',
saas_customer_type='_widget_1734062123203', technician='_widget_1734062123204',
tmall_maintain_service_status_desc='_widget_1734062123205',
date_fmt_date='_widget_1749000071375',
area_manager_staff_id='_widget_1748496855779',
service_impl_principal_staff_id="_widget_1748496855780",
service_salesmen_staff_id="_widget_1748496855778",
technician_staff_id="_widget_1751877712235",
saas_create_time_date="_widget_1749000071377",
expiry_time_date="_widget_1749000071382",
install_create_time_date="_widget_1749000071384",
last_end_date_date="_widget_1749000071389", renew_date_date="_widget_1749000071391"
, 源NGV是否已删除="_widget_1754285499851")
if __name__ == '__main__':
start = UpdateNGVData()
start.main()