Files
saas/test/daily_dispatch_stats_by_region_and_agent.py
T

60 lines
2.3 KiB
Python

import os
from config import Config
import pandas as pd
from back_ground_module import CommonModule
from api import API
from log_config import configure_task_logger, configure_error_task_logger
from datetime import datetime, timedelta
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
common_module = CommonModule()
api_instance = API()
class DailyDispatchStatsByRegionAndAgent :
"""
区域&客服人员每日派发数量统计
"""
def __init__(self):
self.table_ids_list = [("675b900991ad2491c69389ca", "675b9c63925cd404038a6b86"), # 日常回访表
("675b900991ad2491c69389ca", "67f8b1d3307bad317abc3a9a"), # 问题跟进表
("6717470a0b3975ef583c6df1", "67174710da507490d8ac12c1"), # 接车宝
]
def get_data(self,date_back=1):
select_date = (datetime.now() - timedelta(days=date_back)).strftime("%Y-%m-%d")
data_ids = []
for table_id, form_id in self.table_ids_list:
# 获取表单数据
data = {"api_key": table_id, "entry_id": form_id, "filter": {"rel": "and", "cond": [
{"field": "createTime", "type": "datetime", "method": "eq", "value": [select_date]}]}}
res_data = api_instance.entry_data_list(data)
data_ids.append([i["_id"] for i in res_data["data"]])
return data_ids
def get_workflow_data(self,data_ids):
for data_id in data_ids:
workflow_data = api_instance.workflow_instance_get(data_id)
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# step1:获取昨日新建数据
data_ids = self.get_data(date_back=1)
# step2:获取昨日流程数据节点
except Exception as e:
error_task_logger.error("区域&客服人员每日派发数量统计失败")
common_module.send_task_error(task_start_time, "区域&客服人员每日派发数量统计", str(e))
if __name__ == '__main__':
daily_dispatch_stats_by_region_and_agent = DailyDispatchStatsByRegionAndAgent()
daily_dispatch_stats_by_region_and_agent.main()