391 lines
19 KiB
Python
391 lines
19 KiB
Python
from datetime import timedelta, datetime
|
||
from config import Config
|
||
import pandas as pd
|
||
from back_ground_module import CommonModule
|
||
from api import API
|
||
from log_config import configure_task_logger, configure_error_task_logger
|
||
from datetime import datetime, timedelta, timezone
|
||
import pandas as pd
|
||
import os
|
||
|
||
# 获取已经配置好的常规日志记录器
|
||
logger = configure_task_logger()
|
||
# 获取已经配置好的错误任务日志记录器
|
||
error_task_logger = configure_error_task_logger()
|
||
# 保存为CSV文件
|
||
output_dir = "output" # 设置输出目录
|
||
# 创建输出目录(如果不存在)
|
||
import os
|
||
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
common_module = CommonModule()
|
||
api_instance = API()
|
||
|
||
|
||
class JCBEfficientCarPickup:
|
||
"""接车宝日常回访"""
|
||
|
||
def __init__(self):
|
||
# 使用 pymysql 连接数据库
|
||
self.daily_revisit_list = None
|
||
self.field_mapping = {}
|
||
self.staff_id_list = None
|
||
self.customer_service_list = None
|
||
|
||
def load_cus_data(self):
|
||
# 获取接车宝客服表单
|
||
payload = {"api_key": "6717470a0b3975ef583c6df1",
|
||
"entry_id": "67b6f2462f9ac03b783d409a",
|
||
}
|
||
customer_service = api_instance.entry_data_list(payload)
|
||
customer_service_list = customer_service.get("data") # api请求格式,将数据封装在data字典里
|
||
return customer_service_list
|
||
|
||
def today_customer_service_list(self):
|
||
# 获取今日接车宝派发客服顺序
|
||
global is_customer_service_data_id
|
||
today_customer_service_list = []
|
||
all_customer_service_list = []
|
||
today_customer_service_start_list = []
|
||
for row_items in self.load_cus_data():
|
||
# print(row_items)
|
||
customer_service_name_id = row_items.get("_widget_1740042824214", {}).get("username", {})
|
||
customer_service_name = row_items.get("_widget_1740042824214", {}).get("name", {})
|
||
customer_service_state = row_items.get("_widget_1740117343937", {})
|
||
is_last_day_end = row_items.get("_widget_1740042824216", {})
|
||
customer_service_data_id = row_items.get("_id", {})
|
||
print(customer_service_name, customer_service_name_id, customer_service_state, is_last_day_end)
|
||
all_customer_service_list.append(
|
||
[customer_service_name, customer_service_name_id, customer_service_state, is_last_day_end,
|
||
customer_service_data_id])
|
||
if is_last_day_end == "是": # 判断是否是下次开始位置
|
||
last_day_end_customer_service = customer_service_name_id
|
||
is_customer_service_data_id = row_items.get("_id", {})
|
||
|
||
split_index = None
|
||
for index, row in enumerate(all_customer_service_list):
|
||
print(row[3])
|
||
if row[3] == "是":
|
||
split_index = index
|
||
print(f"找到索引 {index}")
|
||
break
|
||
|
||
if split_index is not None:
|
||
# 根据索引切割列表
|
||
first_part = all_customer_service_list[split_index:] # 索引位置及之后的行
|
||
second_part = all_customer_service_list[:split_index] # 索引位置之前的行
|
||
# 调换两个子列表的位置并重新组合
|
||
today_customer_service_start_list = first_part + second_part
|
||
else:
|
||
# 如果没有找到"是",保持原列表不变
|
||
today_customer_service_start_list = all_customer_service_list
|
||
pass
|
||
|
||
for index, row in enumerate(today_customer_service_start_list):
|
||
if row[2] == "开":
|
||
today_customer_service_list.append(row[1])
|
||
|
||
return today_customer_service_list, is_customer_service_data_id, all_customer_service_list
|
||
|
||
def send_request(self, df):
|
||
if df is None or df.empty: # 检查DataFrame是否为None或空
|
||
logger.info("当前派发数据为空或None,跳过此派发")
|
||
return
|
||
|
||
today_customer_service_list, is_customer_service_data_id, all_customer_service_list = self.today_customer_service_list()
|
||
# 初始化派发索引
|
||
next_dispatcher_index = 0
|
||
|
||
# 显式循环分配跟进人
|
||
follow_up_persons = []
|
||
for _ in range(len(df)):
|
||
follow_up_person = today_customer_service_list[next_dispatcher_index]
|
||
follow_up_persons.append(follow_up_person)
|
||
next_dispatcher_index = (next_dispatcher_index + 1) % len(today_customer_service_list)
|
||
|
||
# 添加跟进人到 DataFrame
|
||
df["跟进人"] = follow_up_persons
|
||
|
||
# 获取下一个派发人
|
||
next_dispatcher = today_customer_service_list[next_dispatcher_index]
|
||
|
||
new_sign_abnormal_data = [self.row_to_dict(row, self.field_mapping) for index, row in
|
||
df.iterrows()]
|
||
|
||
data = {'api_key': Config.EFFICIENT_CAR_PICKUP_APP_ID,
|
||
# 'entry_id': "69522f61d0195d3bf42ed251",
|
||
'entry_id': Config.EFFICIENT_CAR_PICKUP_ENTRY_ID,
|
||
"data_list": new_sign_abnormal_data}
|
||
|
||
result = api_instance.entry_data_batch_create(data)
|
||
logger.info(f"数据发送成功:{result}")
|
||
|
||
data1 = {"api_key": Config.EFFICIENT_CAR_PICKUP_APP_ID,
|
||
"entry_id": Config.EFFICIENT_CAR_PICKUP_CUSTOMER_SERVICE_ID,
|
||
"data_id": is_customer_service_data_id,
|
||
"data":
|
||
{"_widget_1740042824216": {"value": ""}, }
|
||
} # 原来的是"_widget_1740042824216": {"value": "是"},修改昨日截至人员
|
||
next_customer_service_data_id = None
|
||
for index, row in enumerate(all_customer_service_list):
|
||
print(row[3])
|
||
if row[1] == next_dispatcher:
|
||
next_customer_service_data_id = row[4]
|
||
break
|
||
|
||
data2 = {"api_key": Config.EFFICIENT_CAR_PICKUP_APP_ID,
|
||
"entry_id": Config.EFFICIENT_CAR_PICKUP_CUSTOMER_SERVICE_ID,
|
||
"data_id": next_customer_service_data_id,
|
||
"data":
|
||
{"_widget_1740042824216": {"value": "是"}, }}
|
||
|
||
api_instance.entry_data_update(data1)
|
||
result2 = api_instance.entry_data_update(data2)
|
||
logger.info(f"明日派发人员信息已修改:{result2}")
|
||
|
||
def load_all_data(self):
|
||
# 获取接车宝日常回访单
|
||
payload = {"api_key": "6717470a0b3975ef583c6df1",
|
||
"entry_id": "67174710da507490d8ac12c1",
|
||
}
|
||
daily_revisit = api_instance.entry_data_list(payload)
|
||
self.daily_revisit_list = daily_revisit.get("data") # api请求格式,将数据封装在data字典里
|
||
|
||
def main(self):
|
||
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
try:
|
||
logger.info(f"接车宝日常回访开始执行")
|
||
data_JCB = common_module.get_jcb_details()
|
||
if data_JCB is None:
|
||
logger.error("获取接车宝数据失败,返回None")
|
||
raise ValueError("获取接车宝数据失败,返回None")
|
||
self.load_all_data()
|
||
|
||
logger.info(f"数据加载完成")
|
||
|
||
# data_JCB.to_csv(os.path.join(output_dir, 'JCB_all_data.csv'), index=False)
|
||
self.fields()
|
||
|
||
# 新签异常待办回访。
|
||
# 当前日期
|
||
current_date = datetime.now()
|
||
current_date = current_date + timedelta(days=0)
|
||
current_date_str = current_date.strftime("%Y-%m-%d")
|
||
|
||
seven_days_ago = current_date - timedelta(days=7)
|
||
seven_days_ago = seven_days_ago.date()
|
||
# print(three_days_ago)
|
||
new_sign_abnormal = []
|
||
|
||
for index, row in data_JCB.iterrows():
|
||
new_row = row.copy()
|
||
# 先转成字符串,再解析回 date 对象
|
||
new_row['开户日'] = datetime.strptime(str(row['开户日']), "%Y-%m-%d").date()
|
||
if new_row['开户日'] == seven_days_ago and row['当月开单天数'] == 0:
|
||
# print(row['账号'], row['开户日'], row['当月开单天数'])
|
||
row["日期"] = datetime.strptime(str(row['开户日']), "%Y-%m-%d").date()
|
||
row['日期'] = row["日期"].strftime("%Y-%m-%d")
|
||
new_sign_abnormal.append(row)
|
||
|
||
new_sign_abnormal = pd.DataFrame(new_sign_abnormal) if new_sign_abnormal else None
|
||
if new_sign_abnormal is not None and not new_sign_abnormal.empty:
|
||
new_sign_abnormal["表单类型"] = "新签异常待办"
|
||
new_sign_abnormal["派发日期"] = current_date_str
|
||
self.send_request(new_sign_abnormal) # 发送请求
|
||
logger.info(f"新签异常待办回访完成")
|
||
else:
|
||
logger.info(f"新签异常待办回访无数据,跳过")
|
||
|
||
# 异常待办
|
||
current_local = datetime.now() + timedelta(days=-1) # tz-naive,代表本地时间
|
||
current_date_str = current_local.strftime("%Y-%m-%d")
|
||
# 计算30天前的本地日期(用于开户日判断)
|
||
thirty_days_ago_local = (current_local - timedelta(days=30)).date()
|
||
|
||
abnormal_data = []
|
||
for index, row in data_JCB.iterrows():
|
||
try:
|
||
# 开户日是本地日期字符串,解析为 date 对象
|
||
open_date = datetime.strptime(str(row['开户日']), "%Y-%m-%d").date()
|
||
except (ValueError, TypeError):
|
||
continue # 跳过无效日期
|
||
|
||
if (
|
||
open_date < thirty_days_ago_local
|
||
and row['近30天开单天数'] == 0
|
||
and row['客户状态'] == "留存"
|
||
):
|
||
new_row = row.copy()
|
||
new_row["日期"] = open_date.strftime("%Y-%m-%d")
|
||
abnormal_data.append(new_row)
|
||
|
||
abnormal_data = pd.DataFrame(abnormal_data) if abnormal_data else pd.DataFrame()
|
||
|
||
if not abnormal_data.empty:
|
||
abnormal_data["表单类型"] = "异常待办"
|
||
abnormal_data["派发日期"] = current_date_str
|
||
|
||
# 清洗手机号(仅去除浮点型 .0)
|
||
def clean_phone(x):
|
||
if pd.isna(x) or x == "" or x == "None":
|
||
return ""
|
||
s = str(x)
|
||
if s.endswith('.0') and s[:-2].isdigit():
|
||
return s[:-2]
|
||
return s
|
||
|
||
abnormal_data['联系手机号'] = abnormal_data['联系手机号'].apply(clean_phone)
|
||
|
||
# 构建云端已派发记录 DataFrame
|
||
df_cloud = pd.DataFrame([
|
||
{
|
||
"数据id": item.get("_id", ""),
|
||
"账号": item.get("_widget_1739258942667", ""),
|
||
"提交时间": item.get("createTime", ""),
|
||
"表单类型": item.get("_widget_1739951204545", "")
|
||
}
|
||
for item in self.daily_revisit_list
|
||
])
|
||
|
||
recent_accounts = set()
|
||
if not df_cloud.empty and not abnormal_data.empty:
|
||
# 将 createTime 转为 UTC 时间(强制统一时区)
|
||
df_cloud["提交时间"] = pd.to_datetime(df_cloud["提交时间"], utc=True, errors="coerce")
|
||
df_cloud = df_cloud.dropna(subset=["提交时间"])
|
||
|
||
# 筛选“异常待办”
|
||
df_abnormal_cloud = df_cloud[df_cloud["表单类型"] == "异常待办"]
|
||
|
||
if not df_abnormal_cloud.empty:
|
||
# 每个账号保留最新一条
|
||
df_recent = df_abnormal_cloud.sort_values("提交时间").groupby("账号", as_index=False).tail(1)
|
||
|
||
current_utc = datetime.now(timezone.utc)
|
||
cutoff_utc = pd.Timestamp(current_utc) - pd.Timedelta(days=30)
|
||
|
||
# 安全比较:两边都是 UTC
|
||
recent_accounts = set(df_recent[df_recent["提交时间"] > cutoff_utc]["账号"])
|
||
|
||
# 剔除已派发账号 + 过滤有效手机号
|
||
if not abnormal_data.empty:
|
||
abnormal_data = abnormal_data[
|
||
(~abnormal_data["账号"].isin(recent_accounts)) &
|
||
(abnormal_data["联系手机号"].notna()) &
|
||
(abnormal_data["联系手机号"] != "") &
|
||
(abnormal_data["联系手机号"] != "None")
|
||
]
|
||
|
||
# # 保存结果
|
||
output_path = os.path.join(output_dir, "异常待办1.csv")
|
||
abnormal_data.to_csv(output_path, index=False)
|
||
|
||
# 发送或跳过
|
||
if not abnormal_data.empty:
|
||
abnormal_data = abnormal_data[:20]
|
||
self.send_request(abnormal_data)
|
||
logger.info(f"异常待办完成,共 {len(abnormal_data)} 条")
|
||
else:
|
||
logger.info("异常待办无数据,跳过")
|
||
|
||
# 优质客户转商机
|
||
# current_date = datetime.now()
|
||
thirty_days_ago = current_date - timedelta(days=30)
|
||
sixty_days_ago = current_date - timedelta(days=60)
|
||
thirty_days_ago = thirty_days_ago.date()
|
||
sixty_days_ago = sixty_days_ago.date()
|
||
customer_to_opportunity = []
|
||
for index, row in data_JCB.iterrows():
|
||
new_row = row.copy()
|
||
# 先转成字符串,再解析回 date 对象
|
||
new_row['到期日'] = datetime.strptime(str(row['到期日']), "%Y-%m-%d").date()
|
||
if new_row['到期日'] == thirty_days_ago and row['近一周开单量'] >= 3 and row[
|
||
'G状态:近30天开单大于等于10天'] == 1:
|
||
print(row['账号'], row['到期日'], row['当月开单天数'], row['当月G天数'])
|
||
row["日期"] = datetime.strptime(str(row['开户日']), "%Y-%m-%d").date()
|
||
row['日期'] = row["日期"].strftime("%Y-%m-%d")
|
||
customer_to_opportunity.append(row)
|
||
# 推送给客服
|
||
pass
|
||
if new_row['到期日'] == sixty_days_ago and row['近一周开单量'] >= 3 and row[
|
||
'G状态:近30天开单大于等于10天'] == 1:
|
||
print(row['账号'], row['到期日'], row['当月开单天数'], row['当月G天数'])
|
||
row["日期"] = datetime.strptime(str(row['开户日']), "%Y-%m-%d").date()
|
||
row['日期'] = row["日期"].strftime("%Y-%m-%d")
|
||
customer_to_opportunity.append(row)
|
||
# 推送给客服
|
||
pass
|
||
|
||
customer_to_opportunity = pd.DataFrame(customer_to_opportunity) if customer_to_opportunity else None
|
||
if customer_to_opportunity is not None and not customer_to_opportunity.empty:
|
||
customer_to_opportunity["表单类型"] = "续约优质客户转商机"
|
||
customer_to_opportunity["派发日期"] = current_date_str
|
||
self.send_request(customer_to_opportunity)
|
||
logger.info(f"优质客户转商机完成")
|
||
else:
|
||
logger.info(f"优质客户转商机无数据,跳过")
|
||
|
||
# 过期7天客服回访
|
||
# current_date = datetime.now()
|
||
seven_days_ago = current_date - timedelta(days=7)
|
||
seven_days_ago = seven_days_ago.date()
|
||
outdated_30 = []
|
||
for index, row in data_JCB.iterrows():
|
||
new_row = row.copy()
|
||
new_row['到期日'] = datetime.strptime(str(row['到期日']), "%Y-%m-%d").date()
|
||
# seven_days_ago = seven_days_ago.date()
|
||
# print(row['到期日'], seven_days_ago)
|
||
if new_row['到期日'] == seven_days_ago and row['客户状态'] == "过期":
|
||
print(row['账号'], row['到期日'], row['当月开单天数'], row['当月G天数'])
|
||
row["日期"] = datetime.strptime(str(row['开户日']), "%Y-%m-%d").date()
|
||
row['日期'] = row["日期"].strftime("%Y-%m-%d")
|
||
outdated_30.append(row)
|
||
# 推送给客服
|
||
pass
|
||
|
||
outdated_30 = pd.DataFrame(outdated_30) if outdated_30 else None
|
||
if outdated_30 is not None and not outdated_30.empty:
|
||
outdated_30["表单类型"] = "过期7天回访"
|
||
outdated_30["派发日期"] = current_date_str
|
||
self.send_request(outdated_30)
|
||
logger.info(f"过期7天客服回访完成")
|
||
else:
|
||
logger.info(f"过期7天客服回访无数据,跳过")
|
||
|
||
common_module.send_task_status(task_start_time, "接车宝日常派发")
|
||
logger.info(f"接车宝日常派发执行完成")
|
||
except Exception as e:
|
||
common_module.send_task_error(task_start_time, "接车宝日常派发", str(e))
|
||
error_task_logger.error(f"接车宝日常派发执行出错:{e}")
|
||
|
||
@staticmethod
|
||
def row_to_dict(row, field_mapping):
|
||
"""将一行数据转换为指定格式的字典"""
|
||
result = {}
|
||
# print(field_mapping)
|
||
for col_name, widget_id in field_mapping.items():
|
||
# print(col_name, widget_id)
|
||
if col_name in row:
|
||
value = row[col_name]
|
||
clean_value = None if pd.isna(value) else value
|
||
result[widget_id] = {"value": clean_value}
|
||
return result
|
||
|
||
def fields(self):
|
||
self.field_mapping = {"日期": "_widget_1739252804406", "产品名称": "_widget_1739252804397",
|
||
"账号": "_widget_1739258942667", "联系手机号": "_widget_1739252804407",
|
||
"使用时长": "_widget_1739252804409", "开户日": "_widget_1739252804396",
|
||
"到期日": "_widget_1739252804408", "续约日": "_widget_1739252804410",
|
||
"客户状态": "_widget_1739252804400", "近一周开单量": "_widget_1739252804413",
|
||
"近一周是否活跃": "_widget_1739252804414",
|
||
"G状态:近30天开单大于等于10天": "_widget_1739252804415",
|
||
"当月开单天数": "_widget_1739252804416", "近30天开单天数": "_widget_1739252804417",
|
||
"当月G天数": "_widget_1739252804418", "日分区": "_widget_1739252804419",
|
||
"表单类型": "_widget_1739951204545", "派发日期": "_widget_1740036367181",
|
||
"跟进人": "_widget_1740043340255",
|
||
}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
start = JCBEfficientCarPickup()
|
||
start.main()
|