Files
saas/test/续约待办派发测试数据.py
T
panda ab434f6c4c 非标业绩提报、合伙人结算登记字段修改
续约回访 宜搭同步简道云辅助脚本 简道云同步宜搭辅助脚本
2025-12-25 14:56:45 +08:00

517 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from datetime import datetime
import pandas as pd
from api import API
from back_ground_module import CommonModule
from log_config import configure_task_logger, configure_error_task_logger
from collections import defaultdict
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
api_instance = API()
common_module = CommonModule()
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
class RenewalToDo:
def __init__(self):
self.renewal_data_list = None
self.cyclic_increasing = None
self.franchisee = None
self.last_price = None
self.province_staff_id_list = None
self.json_list = None
self.data_NGV = None
self.staff_id_list = None
self.NGV_data_list = None
self.field_map = {
"关联数据": "_widget_1764820541663",
"公司名称": "_widget_1764820541616",
"门店名称": "_widget_1764820541617",
"门店编码": "_widget_1764820541661",
"加盟商": "_widget_1764820541618",
"过期日": "_widget_1764820541672",
"Saas版本": "_widget_1764820541623",
"上次购买价格": "_widget_1764820541624",
"联系人": "_widget_1764820541621",
"联系手机号": "_widget_1764820541622",
"专属运营顾问": "_widget_1764820541625",
"区域客服": "_widget_1764820541715",
"运营专家": "_widget_1764820541678",
"120天是否跟进": "_widget_1764820541628",
"120天处理人": "_widget_1764820541634",
"120天跟进时间": "_widget_1765352838631",
"60天是否跟进": "_widget_1764820541630",
"60天处理人": "_widget_1764820541635",
"60天跟进时间": "_widget_1765352838632",
"30天是否跟进": "_widget_1764820541632",
"30天处理人": "_widget_1764820541636",
"30天跟进时间": "_widget_1765352838633",
"是否联系上客户": "_widget_1764820541638",
"客户现阶段问题分类": "_widget_1764820541641",
"未联系上原因字段": "_widget_1765330820509",
"联系情况及问题说明": "_widget_1764820541653",
"潜在商机": "_widget_1764820541657",
"商机详情": "_widget_1764820541659",
"门店续约意愿": "_widget_1764820541654",
"不续约原因": "_widget_1764820541700",
"产品原因": "_widget_1764820541707",
"服务问题": "_widget_1764820541709",
"门店原因": "_widget_1764820541711",
"价格原因": "_widget_1764820541713",
"不续约具体情况说明": "_widget_1764820541702",
"回访完成方式": "_widget_1764820541697",
"周期性增购": "_widget_1764820541717",
"周期性增购.商品名称": "_widget_1764820541717._widget_1764820541719",
"周期性增购.分母金额": "_widget_1764820541717._widget_1764820541720",
"周期性增购.应续约日": "_widget_1764820541717._widget_1764820541721",
"周期性增购.上次购买数量": "_widget_1764820541717._widget_1764820541722",
"周期性增购.不续约原因": "_widget_1764820541717._widget_1764820541723",
"周期性增购.是否愿意续约": "_widget_1764820541717._widget_1764820541724",
"周期性增购.续约后订单编码": "_widget_1764820541717._widget_1764820541725",
"订单编码": "_widget_1764820541674",
"订单支付日期": "_widget_1764820541679",
"本次-实付金额(元)": "_widget_1764820541676",
"业务类型(续约、升级)": "_widget_1764820541680",
"连锁门店待办同步处理": "_widget_1764820541681",
"选择需要同步的门店名称": "_widget_1765330820391",
"120天自动流转时间": "_widget_1764820541865",
"60天自动流转时间": "_widget_1765964381895",
"30天自动流转时间": "_widget_1765964381896",
"0天自动流转时间": "_widget_1765964381897",
"当前所处节点": "_widget_1765352838609",
"流程状态": "_widget_1765352838610",
"经营模式": "_widget_1765964381952",
"公司等级": "_widget_1766130435561",
"提交人": "creator",
"提交时间": "createTime",
"更新时间": "updateTime"
}
self.cn_field_map = {
"related_data": "关联数据",
"group_name": "公司名称",
"org_name": "门店名称",
"org_code": "门店编码",
"expiry_time": "过期日",
"saas_edition_fmt": "Saas版本",
"contacts": "联系人",
"contact_mobile": "联系手机号",
"service_impl_principal": "专属运营顾问",
"group_grade": "公司等级",
"technician": "运营专家",
"manage_model": "经营模式",
}
self.subform_field_map = {
"商品名称": "_widget_1764820541719",
"分母金额": "_widget_1764820541720",
"应续约日": "_widget_1764820541721",
"上次购买数量": "_widget_1764820541722",
"不续约原因": "_widget_1764820541723",
"是否愿意续约": "_widget_1764820541724",
"续约后订单编码": "_widget_1764820541725",
# 根据实际需要添加更多字段
}
self.renewal_list_map ={
}
def load_all_data(self):
"""
从各类来源加载数据上加载数据
:return:
"""
# 数据库获取续约回访数据
self.data_NGV = pd.read_csv(os.path.join(output_dir, "data_NGV.csv"), encoding="gbk")
# 获取加盟商信息
self.franchisee = common_module.get_renewal_franchisee_details()
self.franchisee.to_csv(os.path.join(output_dir, "franchisee.csv"))
# 获取上次购买价格
self.last_price = common_module.get_renewal_last_price_details()
self.last_price.to_csv(os.path.join(output_dir, "last_price.csv"))
# 周期性增购
self.cyclic_increasing = common_module.get_cyclic_increasing_renewal_details()
self.cyclic_increasing.to_csv(os.path.join(output_dir, "cyclic_increasing.csv"))
# 获取NGV数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"}
self.NGV_data_list = api_instance.entry_data_list(payload).get("data")
# 获取简道云员工id
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
}
staff_id = api_instance.entry_data_list(payload)
self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
# 省市区人员关系表
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676512ac3e54dc3159460c0a"}
json_dict = api_instance.entry_data_list(payload)
if json_dict and "data" in json_dict:
self.province_staff_id_list = json_dict.get("data")
else:
print("加载省市区人员关系表失败")
self.province_staff_id_list = []
# 获取已派发续约待办(进行中)
payload = {"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"filter": {"rel": "and",
"cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": [0]}]},
}
renewal = api_instance.entry_data_list(payload)
self.renewal_data_list = renewal.get("data")
@staticmethod
def replace_names_with_staff_ids(df, name_columns, staff_id_list):
"""
将 DataFrame 中多个姓名列替换为对应的员工ID。
:param staff_id_list: 简道云获取到员工id
:param df: pandas.DataFrame
:param name_columns: list[str],需要替换的姓名列名列表,例如 ["col1", "col2"]
:return: 修改后的 DataFrame(原列被替换)
"""
# 1. 构建姓名 -> 员工ID 的映射字典(只做一次)
name_to_id = {}
for item in staff_id_list or []:
name = item.get("_widget_1734942794144")
staff_id = item.get("_widget_1734942794145")
if name and staff_id:
name_to_id[str(name).strip()] = str(staff_id)
# 2. 对每个指定的列进行替换
df = df.copy() # 避免修改原始数据
for col in name_columns:
if col not in df.columns:
continue # 跳过不存在的列
# 替换:姓名 → ID,找不到的保留原值(可改为 fillna(None)
df[col] = (
df[col]
.astype(str)
.str.strip()
.map(name_to_id)
.fillna(df[col])
)
return df
@staticmethod
def row_to_dict(row, field_mapping):
"""将一行数据转换为指定格式的字典"""
result = {}
for col_name, widget_id in field_mapping.items():
if col_name not in row:
continue
value = row[col_name]
# 处理:如果 value 是容器类型(list, dict, tuple, np.ndarray),不进行 pd.isna 判断
if isinstance(value, (list, dict, tuple)) or (hasattr(value, '__len__') and not isinstance(value, str)):
clean_value = value
else:
# 标量类型:安全使用 pd.isna
if pd.isna(value):
clean_value = None
elif isinstance(value, pd.Timestamp):
clean_value = value.strftime('%Y-%m-%dT%H:%M:%SZ')
else:
clean_value = value
# 所有字段统一包 {"value": ...},包括子表单
result[widget_id] = {"value": clean_value}
return result
@staticmethod
def en_row_to_cn_row(en_row, en_to_cn_map):
"""
将英文字段的行数据转换为中文字段的行数据
:param en_row: dict 或 pandas.Serieskey 为英文字段
:param en_to_cn_map: dict, 英文字段名 -> 中文字段名
:return: dictkey 为中文字段名
"""
cn_row = {}
for en_key, value in en_row.items():
if en_key in en_to_cn_map:
cn_key = en_to_cn_map[en_key]
cn_row[cn_key] = value
# 可选:忽略无法映射的字段,或记录警告
return cn_row
@staticmethod
def get_customer_service_by_location(province_name, city_name, area_name, staff_id_list):
"""
直接遍历 self.staff_id_list,根据省市区匹配续约回访客服。
:return: 客服用户名(str),未找到则返回提示信息
"""
if not all([province_name, city_name, area_name]):
return "数据缺失: 省市区不完整"
for item in staff_id_list or []:
try:
prov = item.get('_widget_1734677164861', '').strip()
city = item.get('_widget_1734677164862', '').strip()
area = item.get('_widget_1734677164863', '').strip()
if (prov == province_name.strip() and
city == city_name.strip() and
area == area_name.strip()):
# 提取客服用户名
staff_info = item.get('_widget_1734677164869', {}) # 续约回访客服
username = staff_info.get('username')
return username if username else "数据缺失: 客服用户名为空"
except Exception:
continue # 跳过格式异常的记录
return "数据缺失: 未找到对应的续约回访客服"
def build_subform_records(
self,
df: pd.DataFrame,
group_by_col: str,
field_mapping: dict,
) -> dict:
"""
通用子表单预处理函数:将子表单 DataFrame 转换为 {group_key: [subform_record1, subform_record2, ...]} 的字典。
:param df: 子表单数据 DataFrame,列名为中文(如 "商品名称", "分母金额"
:param group_by_col: 用于分组的列名(如 "门店编码"
:param field_mapping: 字段映射字典,{中文字段名: widget_id},例如 {"商品名称": "_widget_xxx"}
:return: dictkey 为 group_by_col 的值,value 为该组对应的子表单记录列表,
每条记录是 {widget_id: {"value": clean_value}} 的 dict
"""
if df.empty:
return defaultdict(list)
result = defaultdict(list)
target_fields = set(field_mapping.keys())
for _, row in df.iterrows():
row_dict = row.to_dict()
group_key = row_dict.get(group_by_col)
if not group_key or (isinstance(group_key, str) and group_key.strip() == ""):
warning_msg = f"子表单行缺少分组字段 '{group_by_col}',跳过: {row_dict}"
# 构建单条子表单记录
sub_record = {}
for field_cn, widget_id in field_mapping.items():
val = row_dict.get(field_cn)
# 清理值
if pd.isna(val):
clean_val = None
elif hasattr(val, 'to_eng_string'): # Decimal
try:
clean_val = float(val)
except (ValueError, TypeError):
clean_val = str(val)
elif isinstance(val, pd.Timestamp):
clean_val = val.strftime('%Y-%m-%d %H:%M:%S')
else:
clean_val = val
sub_record[widget_id] = {"value": clean_val}
result[group_key].append(sub_record)
return result
def process_data(self):
"""
数据处理加工
:return: 处理后的 DataFrame,列名为中文
"""
data_NGV = self.data_NGV.copy() # 避免修改原始数据
# === 将英文字段名替换为中文字段名 ===
# 但只重命名存在的列
rename_map = {en: cn for en, cn in self.cn_field_map.items() if en in data_NGV.columns}
data_NGV.rename(columns=rename_map, inplace=True)
# 日期字段处理(使用中文列名)
time_columns = ['过期日']
data_NGV[time_columns] = data_NGV[time_columns].apply(
lambda col: pd.to_datetime(col, errors='coerce')
.dt.tz_localize('Asia/Shanghai')
.dt.tz_convert('UTC')
)
# 新增4列:辅助时间字段
data_NGV['120天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=60)
data_NGV['60天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=30)
data_NGV['30天自动流转时间'] = data_NGV['过期日'] - pd.Timedelta(days=0)
data_NGV['0天自动流转时间'] = data_NGV['过期日'] + pd.Timedelta(days=90)
# 格式化为字符串(去掉时区)
for col in ['过期日', '120天自动流转时间', '60天自动流转时间', '30天自动流转时间', '0天自动流转时间']:
data_NGV[col] = data_NGV[col].dt.strftime('%Y-%m-%d %H:%M:%S')
# 新增加盟商列
data_NGV = data_NGV.merge(
self.franchisee[['门店编码', '加盟商']],
on='门店编码',
how='left'
)
# 新增上次购买价格列
# 1. 清洗并拼接类型+价格
df_lp = self.last_price[['门店编码', '类型', '价格']].copy()
df_lp['类型'] = df_lp['类型'].fillna('').astype(str)
df_lp['价格'] = (
pd.to_numeric(df_lp['价格'], errors='coerce')
.round().fillna(0).astype(int).astype(str)
)
df_lp['类型_价格'] = df_lp['类型'] + df_lp['价格']
# 2. 按门店聚合,分号连接
agg_df = df_lp.groupby('门店编码', as_index=False)['类型_价格'].apply(';'.join)
# 3. 合并回主表
data_NGV = data_NGV.merge(agg_df, on='门店编码', how='left').fillna({'类型_价格': ''})
data_NGV.rename(columns={'类型_价格': '上次购买价格'}, inplace=True)
# 4. 处理没有匹配记录的门店(填空或默认值)
data_NGV['上次购买价格'] = data_NGV['上次购买价格'].fillna('')
# 成员字段替换(现在列名是中文)
staff_name_cols = [
"专属运营顾问",
"运营专家",
]
data_NGV = self.replace_names_with_staff_ids(data_NGV, staff_name_cols, self.staff_id_list)
return data_NGV
def dispatch_task(self, data_NGV):
"""
拆分为三个独立动作(输入 data_NGV 列名为中文):
1. 获取关联数据(NGV_data_id
2. 获取区域客服(regional_customer_service
3. 字段映射与格式化(中文 → widget),正确处理子表单
"""
records = []
no_customer_service_data = []
# === 使用通用函数预处理周期性增购子表单 ===
cyclic_subforms = self.build_subform_records(
df=self.cyclic_increasing,
group_by_col="门店编码",
field_mapping=self.subform_field_map,
)
# === Step 1: 构建 门店编码 → NGV 数据ID 映射 ===
org_code_to_ngv_id = {}
for ngv_item in self.NGV_data_list or []:
org_code = ngv_item.get("_widget_1734062123071")
ngv_id = ngv_item.get("_id")
if org_code and ngv_id:
org_code_to_ngv_id[org_code] = ngv_id
# === Step 2: 定义获取区域客服的函数 ===
def get_regional_customer_service(row):
province = row.get("省份") or row.get("province_name")
city = row.get("城市") or row.get("city_name")
area = row.get("区县") or row.get("district_name") or row.get("area_name")
org_code = row.get("门店编码")
# 若省市区缺失,尝试从 NGV 补全
if not all([province, city, area]) or any(
v in [None, '', 'None', 'NA'] for v in [province, city, area]
):
ngv_record = next(
(item for item in self.NGV_data_list
if item.get("_widget_1734062123071") == org_code),
None
)
if ngv_record:
province = ngv_record.get("_widget_1734062123090")
city = ngv_record.get("_widget_1734062123092")
area = ngv_record.get("_widget_1734062123094")
logger.info(f"【从NGV补全省市区】门店 {org_code}: {province}, {city}, {area}")
if not all([province, city, area]) or any(
v in [None, '', 'None', 'NA'] for v in [province, city, area]
):
logger.warning(f"【省市区信息缺失】门店 {org_code} 省市区不完整,客服设为空")
return None
customer_service = self.get_customer_service_by_location(
str(province).strip(),
str(city).strip(),
str(area).strip(),
self.province_staff_id_list
)
if customer_service and "数据缺失" not in str(customer_service):
logger.info(f"【派发客服】门店 {org_code} 派发给客服: {customer_service}")
return customer_service
else:
logger.warning(f"未找到区域客服,请检查门店编码: {org_code}")
return None
# === Step 3: 遍历主表每一行,构建最终提交记录 ===
for _, row in data_NGV.iterrows():
row_dict = row.to_dict()
# 3.1 关联数据(NGV ID
org_code = row_dict.get("门店编码")
ngv_id = org_code_to_ngv_id.get(org_code)
row_dict["关联数据"] = ngv_id if ngv_id else None
if not ngv_id:
logger.warning(f"未找到关联数据,请检查门店编码: {org_code}")
# 3.2 区域客服
customer_service = get_regional_customer_service(row_dict)
row_dict["区域客服"] = customer_service
if not customer_service:
no_customer_service_data.append(row_dict)
# 3.3 注入周期性增购子表单
row_dict["周期性增购"] = cyclic_subforms.get(org_code, [])
# 3.4 转换为 widget 格式
widget_record = self.row_to_dict(row_dict, self.field_map)
records.append(widget_record)
# === Step 4: 批量提交 ===
if not records:
logger.info("无数据需要派发")
return
payload = {
"api_key": "675b900991ad2491c69389ca",
"entry_id": "6931063d64187eaf6b927557",
"data_list": records
}
print(payload)
api_instance.entry_data_batch_create(payload)
logger.info(f"已提交 {len(records)} 条数据进行派发")
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
logger.info("任务开始")
# step1: 获取数据
self.load_all_data()
logger.info("加载数据完成")
# step2:数据处理
data_NGV = self.process_data()
# step3:数据派发
self.dispatch_task(data_NGV)
# step4:过期日发生变化更新已有表单
# step5:自动同意原表单
common_module.send_task_status(task_start_time, "续约回访待办")
except Exception as e:
error_task_logger.error(f"续约回访待办发生错误{e}")
# common_module.send_task_error(task_start_time, "续约回访待办", str(e))
if __name__ == '__main__':
RenewalToDo().main()