Files
saas/back_ground_module/indtall_event_dispatcher.py
2025-08-14 11:55:03 +08:00

162 lines
6.3 KiB
Python

from api import API
from back_ground_module import CommonModule
import datetime
import pandas as pd
from log_config import configure_task_logger, configure_error_task_logger
api_instance = API()
common_module = CommonModule()
start_time = datetime.datetime.now()
# 获取已经配置好的常规日志记录器
logger = configure_task_logger()
# 获取已经配置好的错误任务日志记录器
error_task_logger = configure_error_task_logger()
class InstallEventDispatcher:
"""安装服务历史派发"""
def __init__(self):
# 直接在初始化时设置映射关系
self.services_list = None
self.reversed_field_mapping = {
"": "_widget_1750301534569",
"": "_widget_1750301534570",
"": "_widget_1750301534571",
"门店名称": "_widget_1750301534572",
"门店id": "_widget_1750301534573",
"负责人": "_widget_1750301534574",
"联系电话": "_widget_1750301534575",
"线索状态": "_widget_1750301534577",
"线索来源": "_widget_1750301534576",
}
self.field_mapping = {
"": "_widget_1744177321450",
"": "_widget_1744182647145",
"": "_widget_1744182647146",
"门店名称": "_widget_1744177321449",
"门店id": "_widget_1744177321451",
"负责人": "_widget_1744177321452",
"联系电话": "_widget_1744177321453",
"线索来源": "_widget_1744187212674",
}
self.install_service_lead = None
def load_all_data(self):
"""加载所有必要的数据表"""
# 安装服务线索池
payload = {"api_key": "66f3a68c6e56814df2c6b1af", "entry_id": "68537b5e60a6295c6c09b464"}
json_dict = api_instance.entry_data_list(payload)
self.install_service_lead = json_dict.get("data")
# 安装服务客服表
payload = {"api_key": "66f3a68c6e56814df2c6b1af", "entry_id": "6809d4ef063ece5c83fc61ad"}
json_dict = api_instance.entry_data_list(payload)
self.services_list = json_dict.get("data")
def row_to_dict(self, row, field_mapping):
"""将一行数据转换为指定格式的字典"""
result = {}
for col_name, widget_id in field_mapping.items():
if col_name in row:
value = row[col_name]
clean_value = None if pd.isna(value) else value
result[widget_id] = {"value": clean_value}
return result
def reversed_dict(self, old_dict, field_mapping):
"""将字段ID映射回中文名称"""
id_to_name = {v: k for k, v in field_mapping.items()}
new_dict = {}
for old_key, value in old_dict.items():
# 使用get方法实现高效查找,未找到时保留原键
new_key = id_to_name.get(old_key, old_key)
new_dict[new_key] = value
return new_dict
def main(self):
"""主函数"""
start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# 1.加载所有数据
self.load_all_data()
install_service_lead_list = self.install_service_lead
# 将list的字段映射为中文
new_sign_abnormal_data = [
self.reversed_dict(old_dict, self.reversed_field_mapping)
for old_dict in install_service_lead_list
]
logger.info(f"加载数据完成")
# 2.获取今日值班客服
today_duty_staff = []
for item in self.services_list:
if item.get("_widget_1740117343937") == "":
today_duty_staff.append(item.get("_widget_1740042824214").get("username"))
count = len(today_duty_staff)
if count == 0:
logger.warning(f"今日值班客服为空,请检查数据")
common_module.send_task_error(start_time, "安装服务历史派发", "今日值班客服为空")
return
logger.info(f"今日值班客服为:{today_duty_staff}")
# 3.数据准备
new_sign_abnormal_data = [item for item in new_sign_abnormal_data if item["线索状态"] != "已派发"]
# 截取今日需要派发的数据
new_sign_abnormal_data = new_sign_abnormal_data[:count]
# 获取今日要派发数据的id
id_list = [item["_id"] for item in new_sign_abnormal_data]
new_sign_abnormal_data = [
self.row_to_dict(row, self.field_mapping)
for row in new_sign_abnormal_data]
logger.info(f"数据准备完成")
# 4.派发今日数据
i = 0
for item in new_sign_abnormal_data:
item.update({"_widget_1744182647149": {"value": today_duty_staff[i]}})
data = {
'api_key': "66f3a68c6e56814df2c6b1af",
'entry_id': "67f5dc467a9f5b2710da965a", # 安装服务意向表
# 'entry_id': "6853c7cc512ffef038917440", # 测试表
"data": item
}
res = api_instance.data_batch_create(data)
logger.info(f"数据派发:{res}")
i += 1
logger.info(f"数据派发完成")
# 5.修改原数据状态为已派发
for id in id_list:
data = {
'api_key': "66f3a68c6e56814df2c6b1af",
'entry_id': "68537b5e60a6295c6c09b464",
"data_id": id,
"data": {"_widget_1750301534577": {"value": "已派发"}}
}
res = api_instance.entry_data_update(data)
logger.info(f"数据状态修改:{res}")
logger.info(f"数据状态修改完成")
common_module.send_task_status(start_time, "安装服务历史派发")
logger.info("安装服务历史任务完成")
except Exception as e:
common_module.send_task_error(start_time, "安装服务历史派发", str(e))
error_task_logger.error(f"安装服务历史派发任务执行失败: {e}")
if __name__ == "__main__":
install_event_dispatcher = InstallEventDispatcher()
install_event_dispatcher.main()