519 lines
22 KiB
Python
519 lines
22 KiB
Python
import os
|
||
from datetime import datetime
|
||
import pandas as pd
|
||
from api import API
|
||
from back_ground_module import CommonModule
|
||
from log_config import configure_task_logger, configure_error_task_logger
|
||
from collections import defaultdict
|
||
|
||
logger = configure_task_logger()
|
||
error_task_logger = configure_error_task_logger()
|
||
api_instance = API()
|
||
common_module = CommonModule()
|
||
output_dir = "output" # 设置输出目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
|
||
class RenewalToDo:
|
||
def __init__(self):
|
||
self.renewal_data_list = None
|
||
self.cyclic_increasing = None
|
||
self.franchisee = None
|
||
self.last_price = None
|
||
self.province_staff_id_list = None
|
||
self.json_list = None
|
||
self.data_NGV = None
|
||
self.staff_id_list = None
|
||
self.NGV_data_list = None
|
||
self.field_map = {
|
||
"关联数据": "_widget_1764820541663",
|
||
"公司名称": "_widget_1764820541616",
|
||
"门店名称": "_widget_1764820541617",
|
||
"门店编码": "_widget_1764820541661",
|
||
"加盟商": "_widget_1764820541618",
|
||
"过期日": "_widget_1764820541672",
|
||
"Saas版本": "_widget_1764820541623",
|
||
"上次购买价格": "_widget_1764820541624",
|
||
"联系人": "_widget_1764820541621",
|
||
"联系手机号": "_widget_1764820541622",
|
||
"专属运营顾问": "_widget_1764820541625",
|
||
"区域客服": "_widget_1764820541715",
|
||
"运营专家": "_widget_1764820541678",
|
||
"120天是否跟进": "_widget_1764820541628",
|
||
"120天处理人": "_widget_1764820541634",
|
||
"120天跟进时间": "_widget_1765352838631",
|
||
"60天是否跟进": "_widget_1764820541630",
|
||
"60天处理人": "_widget_1764820541635",
|
||
"60天跟进时间": "_widget_1765352838632",
|
||
"30天是否跟进": "_widget_1764820541632",
|
||
"30天处理人": "_widget_1764820541636",
|
||
"30天跟进时间": "_widget_1765352838633",
|
||
"是否联系上客户": "_widget_1764820541638",
|
||
"客户现阶段问题分类": "_widget_1764820541641",
|
||
"未联系上原因字段": "_widget_1765330820509",
|
||
"联系情况及问题说明": "_widget_1764820541653",
|
||
"潜在商机": "_widget_1764820541657",
|
||
"商机详情": "_widget_1764820541659",
|
||
"门店续约意愿": "_widget_1764820541654",
|
||
"不续约原因": "_widget_1764820541700",
|
||
"产品原因": "_widget_1764820541707",
|
||
"服务问题": "_widget_1764820541709",
|
||
"门店原因": "_widget_1764820541711",
|
||
"价格原因": "_widget_1764820541713",
|
||
"不续约具体情况说明": "_widget_1764820541702",
|
||
"回访完成方式": "_widget_1764820541697",
|
||
"周期性增购": "_widget_1764820541717",
|
||
"周期性增购.商品名称": "_widget_1764820541717._widget_1764820541719",
|
||
"周期性增购.分母金额": "_widget_1764820541717._widget_1764820541720",
|
||
"周期性增购.应续约日": "_widget_1764820541717._widget_1764820541721",
|
||
"周期性增购.上次购买数量": "_widget_1764820541717._widget_1764820541722",
|
||
"周期性增购.不续约原因": "_widget_1764820541717._widget_1764820541723",
|
||
"周期性增购.是否愿意续约": "_widget_1764820541717._widget_1764820541724",
|
||
"周期性增购.续约后订单编码": "_widget_1764820541717._widget_1764820541725",
|
||
"订单编码": "_widget_1764820541674",
|
||
"订单支付日期": "_widget_1764820541679",
|
||
"本次-实付金额(元)": "_widget_1764820541676",
|
||
"业务类型(续约、升级)": "_widget_1764820541680",
|
||
"连锁门店待办同步处理": "_widget_1764820541681",
|
||
"选择需要同步的门店名称": "_widget_1765330820391",
|
||
"120天自动流转时间": "_widget_1764820541865",
|
||
"60天自动流转时间": "_widget_1765964381895",
|
||
"30天自动流转时间": "_widget_1765964381896",
|
||
"0天自动流转时间": "_widget_1765964381897",
|
||
"当前所处节点": "_widget_1765352838609",
|
||
"流程状态": "_widget_1765352838610",
|
||
"经营模式": "_widget_1765964381952",
|
||
"公司等级": "_widget_1766130435561",
|
||
"公司id": "_widget_1766631811839",
|
||
"提交人": "creator",
|
||
"提交时间": "createTime",
|
||
"更新时间": "updateTime"
|
||
}
|
||
self.cn_field_map = {
|
||
"related_data": "关联数据",
|
||
"group_name": "公司名称",
|
||
"org_name": "门店名称",
|
||
"org_code": "门店编码",
|
||
"expiry_time": "过期日",
|
||
"saas_edition_fmt": "Saas版本",
|
||
"contacts": "联系人",
|
||
"contact_mobile": "联系手机号",
|
||
"service_impl_principal": "专属运营顾问",
|
||
"group_grade": "公司等级",
|
||
"technician": "运营专家",
|
||
"manage_model": "经营模式",
|
||
"id_own_group": "公司id",
|
||
}
|
||
self.subform_field_map = {
|
||
"商品名称": "_widget_1764820541719",
|
||
"分母金额": "_widget_1764820541720",
|
||
"应续约日": "_widget_1764820541721",
|
||
"上次购买数量": "_widget_1764820541722",
|
||
"不续约原因": "_widget_1764820541723",
|
||
"是否愿意续约": "_widget_1764820541724",
|
||
"续约后订单编码": "_widget_1764820541725",
|
||
# 根据实际需要添加更多字段
|
||
}
|
||
self.renewal_list_map ={
|
||
|
||
}
|
||
|
||
def load_all_data(self):
|
||
"""
|
||
从各类来源加载数据上加载数据
|
||
:return:
|
||
"""
|
||
|
||
# 数据库获取续约回访数据
|
||
self.data_NGV = common_module.get_renewal_details()
|
||
# 获取加盟商信息
|
||
self.franchisee = common_module.get_renewal_franchisee_details()
|
||
self.franchisee.to_csv(os.path.join(output_dir, "franchisee.csv"))
|
||
# 获取上次购买价格
|
||
self.last_price = common_module.get_renewal_last_price_details()
|
||
self.last_price.to_csv(os.path.join(output_dir, "last_price.csv"))
|
||
# 周期性增购
|
||
self.cyclic_increasing = common_module.get_cyclic_increasing_renewal_details()
|
||
self.cyclic_increasing.to_csv(os.path.join(output_dir, "cyclic_increasing.csv"))
|
||
|
||
# 获取NGV数据
|
||
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"}
|
||
self.NGV_data_list = api_instance.entry_data_list(payload).get("data")
|
||
|
||
# 获取简道云员工id
|
||
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
|
||
"entry_id": "6769204a1902c9341340a1bc",
|
||
}
|
||
staff_id = api_instance.entry_data_list(payload)
|
||
self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
|
||
|
||
# 省市区人员关系表
|
||
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676512ac3e54dc3159460c0a"}
|
||
json_dict = api_instance.entry_data_list(payload)
|
||
if json_dict and "data" in json_dict:
|
||
self.province_staff_id_list = json_dict.get("data")
|
||
else:
|
||
print("加载省市区人员关系表失败")
|
||
self.province_staff_id_list = []
|
||
|
||
# 获取已派发续约待办(进行中)
|
||
payload = {"api_key": "675b900991ad2491c69389ca",
|
||
"entry_id": "6931063d64187eaf6b927557",
|
||
"filter": {"rel": "and",
|
||
"cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": [0]}]},
|
||
}
|
||
renewal = api_instance.entry_data_list(payload)
|
||
self.renewal_data_list = renewal.get("data")
|
||
|
||
@staticmethod
|
||
def replace_names_with_staff_ids(df, name_columns, staff_id_list):
|
||
"""
|
||
将 DataFrame 中多个姓名列替换为对应的员工ID。
|
||
|
||
:param staff_id_list: 简道云获取到员工id
|
||
:param df: pandas.DataFrame
|
||
:param name_columns: list[str],需要替换的姓名列名列表,例如 ["col1", "col2"]
|
||
:return: 修改后的 DataFrame(原列被替换)
|
||
"""
|
||
# 1. 构建姓名 -> 员工ID 的映射字典(只做一次)
|
||
name_to_id = {}
|
||
for item in staff_id_list or []:
|
||
name = item.get("_widget_1734942794144")
|
||
staff_id = item.get("_widget_1734942794145")
|
||
if name and staff_id:
|
||
name_to_id[str(name).strip()] = str(staff_id)
|
||
|
||
# 2. 对每个指定的列进行替换
|
||
df = df.copy() # 避免修改原始数据
|
||
for col in name_columns:
|
||
if col not in df.columns:
|
||
continue # 跳过不存在的列
|
||
# 替换:姓名 → ID,找不到的保留原值(可改为 fillna(None))
|
||
df[col] = (
|
||
df[col]
|
||
.astype(str)
|
||
.str.strip()
|
||
.map(name_to_id)
|
||
.fillna(df[col])
|
||
)
|
||
return df
|
||
|
||
@staticmethod
|
||
def row_to_dict(row, field_mapping):
|
||
"""将一行数据转换为指定格式的字典"""
|
||
result = {}
|
||
for col_name, widget_id in field_mapping.items():
|
||
if col_name not in row:
|
||
continue
|
||
value = row[col_name]
|
||
|
||
# 处理:如果 value 是容器类型(list, dict, tuple, np.ndarray),不进行 pd.isna 判断
|
||
if isinstance(value, (list, dict, tuple)) or (hasattr(value, '__len__') and not isinstance(value, str)):
|
||
clean_value = value
|
||
else:
|
||
# 标量类型:安全使用 pd.isna
|
||
if pd.isna(value):
|
||
clean_value = None
|
||
elif isinstance(value, pd.Timestamp):
|
||
clean_value = value.strftime('%Y-%m-%dT%H:%M:%SZ')
|
||
else:
|
||
clean_value = value
|
||
|
||
# 所有字段统一包 {"value": ...},包括子表单
|
||
result[widget_id] = {"value": clean_value}
|
||
return result
|
||
|
||
@staticmethod
|
||
def en_row_to_cn_row(en_row, en_to_cn_map):
|
||
"""
|
||
将英文字段的行数据转换为中文字段的行数据
|
||
|
||
:param en_row: dict 或 pandas.Series,key 为英文字段
|
||
:param en_to_cn_map: dict, 英文字段名 -> 中文字段名
|
||
:return: dict,key 为中文字段名
|
||
"""
|
||
cn_row = {}
|
||
for en_key, value in en_row.items():
|
||
if en_key in en_to_cn_map:
|
||
cn_key = en_to_cn_map[en_key]
|
||
cn_row[cn_key] = value
|
||
# 可选:忽略无法映射的字段,或记录警告
|
||
return cn_row
|
||
|
||
@staticmethod
|
||
def get_customer_service_by_location(province_name, city_name, area_name, staff_id_list):
|
||
"""
|
||
直接遍历 self.staff_id_list,根据省市区匹配续约回访客服。
|
||
|
||
:return: 客服用户名(str),未找到则返回提示信息
|
||
"""
|
||
if not all([province_name, city_name, area_name]):
|
||
return "数据缺失: 省市区不完整"
|
||
|
||
for item in staff_id_list or []:
|
||
try:
|
||
prov = item.get('_widget_1734677164861', '').strip()
|
||
city = item.get('_widget_1734677164862', '').strip()
|
||
area = item.get('_widget_1734677164863', '').strip()
|
||
|
||
if (prov == province_name.strip() and
|
||
city == city_name.strip() and
|
||
area == area_name.strip()):
|
||
# 提取客服用户名
|
||
staff_info = item.get('_widget_1734677164869', {}) # 续约回访客服
|
||
username = staff_info.get('username')
|
||
return username if username else "数据缺失: 客服用户名为空"
|
||
except Exception:
|
||
continue # 跳过格式异常的记录
|
||
|
||
return "数据缺失: 未找到对应的续约回访客服"
|
||
|
||
def build_subform_records(
|
||
self,
|
||
df: pd.DataFrame,
|
||
group_by_col: str,
|
||
field_mapping: dict,
|
||
) -> dict:
|
||
"""
|
||
通用子表单预处理函数:将子表单 DataFrame 转换为 {group_key: [subform_record1, subform_record2, ...]} 的字典。
|
||
|
||
:param df: 子表单数据 DataFrame,列名为中文(如 "商品名称", "分母金额")
|
||
:param group_by_col: 用于分组的列名(如 "门店编码")
|
||
:param field_mapping: 字段映射字典,{中文字段名: widget_id},例如 {"商品名称": "_widget_xxx"}
|
||
:return: dict,key 为 group_by_col 的值,value 为该组对应的子表单记录列表,
|
||
每条记录是 {widget_id: {"value": clean_value}} 的 dict
|
||
"""
|
||
if df.empty:
|
||
return defaultdict(list)
|
||
|
||
result = defaultdict(list)
|
||
target_fields = set(field_mapping.keys())
|
||
|
||
for _, row in df.iterrows():
|
||
row_dict = row.to_dict()
|
||
group_key = row_dict.get(group_by_col)
|
||
|
||
if not group_key or (isinstance(group_key, str) and group_key.strip() == ""):
|
||
warning_msg = f"子表单行缺少分组字段 '{group_by_col}',跳过: {row_dict}"
|
||
|
||
# 构建单条子表单记录
|
||
sub_record = {}
|
||
for field_cn, widget_id in field_mapping.items():
|
||
val = row_dict.get(field_cn)
|
||
|
||
# 清理值
|
||
if pd.isna(val):
|
||
clean_val = None
|
||
elif hasattr(val, 'to_eng_string'): # Decimal
|
||
try:
|
||
clean_val = float(val)
|
||
except (ValueError, TypeError):
|
||
clean_val = str(val)
|
||
elif isinstance(val, pd.Timestamp):
|
||
clean_val = val.strftime('%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
clean_val = val
|
||
|
||
sub_record[widget_id] = {"value": clean_val}
|
||
|
||
result[group_key].append(sub_record)
|
||
|
||
return result
|
||
|
||
def process_data(self):
|
||
"""
|
||
数据处理加工
|
||
:return: 处理后的 DataFrame,列名为中文
|
||
"""
|
||
data_NGV = self.data_NGV.copy() # 避免修改原始数据
|
||
|
||
# === 将英文字段名替换为中文字段名 ===
|
||
# 但只重命名存在的列
|
||
rename_map = {en: cn for en, cn in self.cn_field_map.items() if en in data_NGV.columns}
|
||
data_NGV.rename(columns=rename_map, inplace=True)
|
||
|
||
# 日期字段处理(使用中文列名)
|
||
time_columns = ['过期日']
|
||
data_NGV[time_columns] = data_NGV[time_columns].apply(
|
||
lambda col: pd.to_datetime(col, errors='coerce')
|
||
.dt.tz_localize('Asia/Shanghai')
|
||
.dt.tz_convert('UTC')
|
||
)
|
||
|
||
# 新增4列:辅助时间字段
|
||
data_NGV['120天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=60)
|
||
data_NGV['60天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=30)
|
||
data_NGV['30天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=0)
|
||
data_NGV['0天自动流转时间'] = data_NGV['过期日'] + pd.Timedelta(days=90)
|
||
|
||
data_NGV['120天是否跟进'] = "自动"
|
||
data_NGV['60天是否跟进']= "自动"
|
||
data_NGV['30天是否跟进']= "自动"
|
||
# 格式化为字符串(去掉时区)
|
||
for col in ['过期日', '120天自动流转时间', '60天自动流转时间', '30天自动流转时间', '0天自动流转时间']:
|
||
data_NGV[col] = data_NGV[col].dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 新增加盟商列
|
||
data_NGV = data_NGV.merge(
|
||
self.franchisee[['门店编码', '加盟商']],
|
||
on='门店编码',
|
||
how='left'
|
||
)
|
||
|
||
# 新增上次购买价格列
|
||
# 1. 清洗并拼接类型+价格
|
||
df_lp = self.last_price[['门店编码', '类型', '价格']].copy()
|
||
df_lp['类型'] = df_lp['类型'].fillna('').astype(str)
|
||
df_lp['价格'] = (
|
||
pd.to_numeric(df_lp['价格'], errors='coerce')
|
||
.round().fillna(0).astype(int).astype(str)
|
||
)
|
||
df_lp['类型_价格'] = df_lp['类型'] + ':' + df_lp['价格']
|
||
|
||
# 2. 按门店聚合,分号连接
|
||
agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join)
|
||
|
||
# 3. 合并回主表
|
||
data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left').fillna({'类型_价格': ''})
|
||
data_NGV.rename(columns={'类型_价格': '上次购买价格'}, inplace=True)
|
||
|
||
# 4. 处理没有匹配记录的门店(填空或默认值)
|
||
data_NGV['上次购买价格'] = data_NGV['上次购买价格'].fillna('')
|
||
|
||
# 成员字段替换(现在列名是中文)
|
||
staff_name_cols = [
|
||
"专属运营顾问",
|
||
"运营专家",
|
||
]
|
||
data_NGV = self.replace_names_with_staff_ids(data_NGV, staff_name_cols, self.staff_id_list)
|
||
|
||
return data_NGV
|
||
|
||
def dispatch_task(self, data_NGV):
|
||
"""
|
||
拆分为三个独立动作(输入 data_NGV 列名为中文):
|
||
1. 获取关联数据(NGV_data_id)
|
||
2. 获取区域客服(regional_customer_service)
|
||
3. 字段映射与格式化(中文 → widget),正确处理子表单
|
||
"""
|
||
records = []
|
||
no_customer_service_data = []
|
||
|
||
# === 使用通用函数预处理周期性增购子表单 ===
|
||
cyclic_subforms = self.build_subform_records(
|
||
df=self.cyclic_increasing,
|
||
group_by_col="门店编码",
|
||
field_mapping=self.subform_field_map,
|
||
)
|
||
|
||
# === Step 1: 构建 门店编码 → NGV 数据ID 映射 ===
|
||
org_code_to_ngv_id = {}
|
||
for ngv_item in self.NGV_data_list or []:
|
||
org_code = ngv_item.get("_widget_1734062123071")
|
||
ngv_id = ngv_item.get("_id")
|
||
if org_code and ngv_id:
|
||
org_code_to_ngv_id[org_code] = ngv_id
|
||
|
||
# === Step 2: 定义获取区域客服的函数 ===
|
||
def get_regional_customer_service(row):
|
||
province = row.get("省份") or row.get("province_name")
|
||
city = row.get("城市") or row.get("city_name")
|
||
area = row.get("区县") or row.get("district_name") or row.get("area_name")
|
||
org_code = row.get("门店编码")
|
||
|
||
# 若省市区缺失,尝试从 NGV 补全
|
||
if not all([province, city, area]) or any(
|
||
v in [None, '', 'None', 'NA'] for v in [province, city, area]
|
||
):
|
||
ngv_record = next(
|
||
(item for item in self.NGV_data_list
|
||
if item.get("_widget_1734062123071") == org_code),
|
||
None
|
||
)
|
||
if ngv_record:
|
||
province = ngv_record.get("_widget_1734062123090")
|
||
city = ngv_record.get("_widget_1734062123092")
|
||
area = ngv_record.get("_widget_1734062123094")
|
||
logger.info(f"【从NGV补全省市区】门店 {org_code}: {province}, {city}, {area}")
|
||
|
||
if not all([province, city, area]) or any(
|
||
v in [None, '', 'None', 'NA'] for v in [province, city, area]
|
||
):
|
||
logger.warning(f"【省市区信息缺失】门店 {org_code} 省市区不完整,客服设为空")
|
||
return None
|
||
|
||
customer_service = self.get_customer_service_by_location(
|
||
str(province).strip(),
|
||
str(city).strip(),
|
||
str(area).strip(),
|
||
self.province_staff_id_list
|
||
)
|
||
|
||
if customer_service and "数据缺失" not in str(customer_service):
|
||
logger.info(f"【派发客服】门店 {org_code} 派发给客服: {customer_service}")
|
||
return customer_service
|
||
else:
|
||
logger.warning(f"未找到区域客服,请检查门店编码: {org_code}")
|
||
return None
|
||
|
||
# === Step 3: 遍历主表每一行,构建最终提交记录 ===
|
||
for _, row in data_NGV.iterrows():
|
||
row_dict = row.to_dict()
|
||
|
||
# 3.1 关联数据(NGV ID)
|
||
org_code = row_dict.get("门店编码")
|
||
ngv_id = org_code_to_ngv_id.get(org_code)
|
||
row_dict["关联数据"] = ngv_id if ngv_id else None
|
||
if not ngv_id:
|
||
logger.warning(f"未找到关联数据,请检查门店编码: {org_code}")
|
||
|
||
# 3.2 区域客服
|
||
customer_service = get_regional_customer_service(row_dict)
|
||
row_dict["区域客服"] = customer_service
|
||
if not customer_service:
|
||
no_customer_service_data.append(row_dict)
|
||
|
||
# 3.3 注入周期性增购子表单
|
||
row_dict["周期性增购"] = cyclic_subforms.get(org_code, [])
|
||
|
||
# 3.4 转换为 widget 格式
|
||
widget_record = self.row_to_dict(row_dict, self.field_map)
|
||
records.append(widget_record)
|
||
|
||
# === Step 4: 批量提交 ===
|
||
if not records:
|
||
logger.info("无数据需要派发")
|
||
return
|
||
|
||
payload = {
|
||
"api_key": "675b900991ad2491c69389ca",
|
||
"entry_id": "6931063d64187eaf6b927557",
|
||
"data_list": records
|
||
}
|
||
print(payload)
|
||
|
||
api_instance.entry_data_batch_create(payload)
|
||
logger.info(f"已提交 {len(records)} 条数据进行派发")
|
||
|
||
def main(self):
|
||
|
||
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
try:
|
||
logger.info("任务开始")
|
||
# step1: 获取数据
|
||
self.load_all_data()
|
||
logger.info("加载数据完成")
|
||
# step2:数据处理
|
||
data_NGV = self.process_data()
|
||
# step3:数据派发
|
||
self.dispatch_task(data_NGV)
|
||
|
||
common_module.send_task_status(task_start_time, "续约回访待办")
|
||
except Exception as e:
|
||
error_task_logger.error(f"续约回访待办发生错误{e}")
|
||
# common_module.send_task_error(task_start_time, "续约回访待办", str(e))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
RenewalToDo().main()
|