@@ -0,0 +1,535 @@
import os
from datetime import datetime
import pandas as pd
from api import API
from back_ground_module import CommonModule
from log_config import configure_task_logger , configure_error_task_logger
from collections import defaultdict
logger = configure_task_logger ( )
error_task_logger = configure_error_task_logger ( )
api_instance = API ( )
common_module = CommonModule ( )
output_dir = " output " # 设置输出目录
os . makedirs ( output_dir , exist_ok = True )
class RenewalToDo :
""" 续约回访待办派发 """
def __init__ ( self ) :
self . renewal_data_list = None
self . cyclic_increasing = None
self . franchisee = None
self . last_price = None
self . province_staff_id_list = None
self . json_list = None
self . data_NGV = None
self . staff_id_list = None
self . NGV_data_list = None
self . field_map = {
" 关联数据 " : " _widget_1764820541663 " ,
" 公司名称 " : " _widget_1764820541616 " ,
" 门店名称 " : " _widget_1764820541617 " ,
" 门店编码 " : " _widget_1764820541661 " ,
" 加盟商 " : " _widget_1764820541618 " ,
" 过期日 " : " _widget_1764820541672 " ,
" Saas版本 " : " _widget_1764820541623 " ,
" 上次购买价格 " : " _widget_1764820541624 " ,
" 联系人 " : " _widget_1764820541621 " ,
" 联系手机号 " : " _widget_1764820541622 " ,
" 专属运营顾问 " : " _widget_1764820541625 " ,
" 区域客服 " : " _widget_1764820541715 " ,
" 运营专家 " : " _widget_1764820541678 " ,
" 120天是否跟进 " : " _widget_1764820541628 " ,
" 120天处理人 " : " _widget_1764820541634 " ,
" 120天跟进时间 " : " _widget_1765352838631 " ,
" 60天是否跟进 " : " _widget_1764820541630 " ,
" 60天处理人 " : " _widget_1764820541635 " ,
" 60天跟进时间 " : " _widget_1765352838632 " ,
" 30天是否跟进 " : " _widget_1764820541632 " ,
" 30天处理人 " : " _widget_1764820541636 " ,
" 30天跟进时间 " : " _widget_1765352838633 " ,
" 是否联系上客户 " : " _widget_1764820541638 " ,
" 客户现阶段问题分类 " : " _widget_1764820541641 " ,
" 未联系上原因字段 " : " _widget_1765330820509 " ,
" 联系情况及问题说明 " : " _widget_1764820541653 " ,
" 潜在商机 " : " _widget_1764820541657 " ,
" 商机详情 " : " _widget_1764820541659 " ,
" 门店续约意愿 " : " _widget_1764820541654 " ,
" 不续约原因 " : " _widget_1764820541700 " ,
" 产品原因 " : " _widget_1764820541707 " ,
" 服务问题 " : " _widget_1764820541709 " ,
" 门店原因 " : " _widget_1764820541711 " ,
" 价格原因 " : " _widget_1764820541713 " ,
" 不续约具体情况说明 " : " _widget_1764820541702 " ,
" 回访完成方式 " : " _widget_1764820541697 " ,
" 周期性增购 " : " _widget_1764820541717 " ,
" 周期性增购.商品名称 " : " _widget_1764820541717._widget_1764820541719 " ,
" 周期性增购.分母金额 " : " _widget_1764820541717._widget_1764820541720 " ,
" 周期性增购.应续约日 " : " _widget_1764820541717._widget_1764820541721 " ,
" 周期性增购.上次购买数量 " : " _widget_1764820541717._widget_1764820541722 " ,
" 周期性增购.不续约原因 " : " _widget_1764820541717._widget_1764820541723 " ,
" 周期性增购.是否愿意续约 " : " _widget_1764820541717._widget_1764820541724 " ,
" 周期性增购.续约后订单编码 " : " _widget_1764820541717._widget_1764820541725 " ,
" 订单编码 " : " _widget_1764820541674 " ,
" 订单支付日期 " : " _widget_1764820541679 " ,
" 本次-实付金额(元) " : " _widget_1764820541676 " ,
" 业务类型(续约、升级) " : " _widget_1764820541680 " ,
" 连锁门店待办同步处理 " : " _widget_1764820541681 " ,
" 选择需要同步的门店名称 " : " _widget_1765330820391 " ,
" 120天自动流转时间 " : " _widget_1764820541865 " ,
" 60天自动流转时间 " : " _widget_1765964381895 " ,
" 30天自动流转时间 " : " _widget_1765964381896 " ,
" 0天自动流转时间 " : " _widget_1765964381897 " ,
" 当前所处节点 " : " _widget_1765352838609 " ,
" 流程状态 " : " _widget_1765352838610 " ,
" 经营模式 " : " _widget_1765964381952 " ,
" 公司等级 " : " _widget_1766130435561 " ,
" 公司id " : " _widget_1766631811839 " ,
" 订单商品名称 " : " _widget_1766730385209 " ,
" 提交人 " : " creator " ,
" 提交时间 " : " createTime " ,
" 更新时间 " : " updateTime "
}
self . cn_field_map = {
" related_data " : " 关联数据 " ,
" group_name " : " 公司名称 " ,
" org_name " : " 门店名称 " ,
" org_code " : " 门店编码 " ,
" expiry_time " : " 过期日 " ,
" saas_edition_fmt " : " Saas版本 " ,
" contacts " : " 联系人 " ,
" contact_mobile " : " 联系手机号 " ,
" service_impl_principal " : " 专属运营顾问 " ,
" group_grade " : " 公司等级 " ,
" technician " : " 运营专家 " ,
" manage_model " : " 经营模式 " ,
" id_own_group " : " 公司id " ,
}
self . subform_field_map = {
" 商品名称 " : " _widget_1764820541719 " ,
" 分母金额 " : " _widget_1764820541720 " ,
" 应续约日 " : " _widget_1764820541721 " ,
" 上次购买数量 " : " _widget_1764820541722 " ,
" 不续约原因 " : " _widget_1764820541723 " ,
" 是否愿意续约 " : " _widget_1764820541724 " ,
" 续约后订单编码 " : " _widget_1764820541725 " ,
# 根据实际需要添加更多字段
}
self . renewal_list_map = {
}
def load_all_data ( self ) :
"""
从各类来源加载数据上加载数据
:return:
"""
# 数据库获取续约回访数据
self . data_NGV = common_module . get_renewal_details ( )
# 获取加盟商信息
self . franchisee = common_module . get_renewal_franchisee_details ( )
self . franchisee . to_csv ( os . path . join ( output_dir , " franchisee.csv " ) )
# 获取上次购买价格
self . last_price = common_module . get_renewal_last_price_details ( )
self . last_price . to_csv ( os . path . join ( output_dir , " last_price.csv " ) )
# 周期性增购
self . cyclic_increasing = common_module . get_cyclic_increasing_renewal_details ( )
self . cyclic_increasing . to_csv ( os . path . join ( output_dir , " cyclic_increasing.csv " ) )
# 获取NGV数据
payload = { " api_key " : " 675b900991ad2491c69389ca " , " entry_id " : " 675bb02bd2d53c2034c665e4 " }
self . NGV_data_list = api_instance . entry_data_list ( payload ) . get ( " data " )
# 获取简道云员工id
payload = { " api_key " : " 6694d3c4fcb69ca9a111a6c4 " ,
" entry_id " : " 6769204a1902c9341340a1bc " ,
}
staff_id = api_instance . entry_data_list ( payload )
self . staff_id_list = staff_id . get ( " data " ) # api请求格式,将数据封装在data字典里
# 省市区人员关系表
payload = { " api_key " : " 675b900991ad2491c69389ca " , " entry_id " : " 676512ac3e54dc3159460c0a " }
json_dict = api_instance . entry_data_list ( payload )
if json_dict and " data " in json_dict :
self . province_staff_id_list = json_dict . get ( " data " )
else :
print ( " 加载省市区人员关系表失败 " )
self . province_staff_id_list = [ ]
# 获取已派发续约待办(进行中)
payload = { " api_key " : " 675b900991ad2491c69389ca " ,
" entry_id " : " 6931063d64187eaf6b927557 " ,
" filter " : { " rel " : " and " ,
" cond " : [ { " field " : " flowState " , " type " : " flowstate " , " method " : " eq " , " value " : [ 0 ] } ] } ,
}
renewal = api_instance . entry_data_list ( payload )
self . renewal_data_list = renewal . get ( " data " )
@staticmethod
def replace_names_with_staff_ids ( df , name_columns , staff_id_list ) :
"""
将 DataFrame 中多个姓名列替换为对应的员工ID。
:param staff_id_list: 简道云获取到员工id
:param df: pandas.DataFrame
:param name_columns: list[str],需要替换的姓名列名列表,例如 [ " col1 " , " col2 " ]
:return: 修改后的 DataFrame(原列被替换)
"""
# 1. 构建姓名 -> 员工ID 的映射字典(只做一次)
name_to_id = { }
for item in staff_id_list or [ ] :
name = item . get ( " _widget_1734942794144 " )
staff_id = item . get ( " _widget_1734942794145 " )
if name and staff_id :
name_to_id [ str ( name ) . strip ( ) ] = str ( staff_id )
# 2. 对每个指定的列进行替换
df = df . copy ( ) # 避免修改原始数据
for col in name_columns :
if col not in df . columns :
continue # 跳过不存在的列
# 替换:姓名 → ID,找不到的保留原值(可改为 fillna(None))
df [ col ] = (
df [ col ]
. astype ( str )
. str . strip ( )
. map ( name_to_id )
. fillna ( df [ col ] )
)
return df
@staticmethod
def row_to_dict ( row , field_mapping ) :
""" 将一行数据转换为指定格式的字典 """
result = { }
for col_name , widget_id in field_mapping . items ( ) :
if col_name not in row :
continue
value = row [ col_name ]
# 处理:如果 value 是容器类型(list, dict, tuple, np.ndarray),不进行 pd.isna 判断
if isinstance ( value , ( list , dict , tuple ) ) or ( hasattr ( value , ' __len__ ' ) and not isinstance ( value , str ) ) :
clean_value = value
else :
# 标量类型:安全使用 pd.isna
if pd . isna ( value ) :
clean_value = None
elif isinstance ( value , pd . Timestamp ) :
clean_value = value . strftime ( ' % Y- % m- %d T % H: % M: % SZ ' )
else :
clean_value = value
# 所有字段统一包 {"value": ...},包括子表单
result [ widget_id ] = { " value " : clean_value }
return result
@staticmethod
def en_row_to_cn_row ( en_row , en_to_cn_map ) :
"""
将英文字段的行数据转换为中文字段的行数据
:param en_row: dict 或 pandas.Series, key 为英文字段
:param en_to_cn_map: dict, 英文字段名 -> 中文字段名
:return: dict, key 为中文字段名
"""
cn_row = { }
for en_key , value in en_row . items ( ) :
if en_key in en_to_cn_map :
cn_key = en_to_cn_map [ en_key ]
cn_row [ cn_key ] = value
# 可选:忽略无法映射的字段,或记录警告
return cn_row
@staticmethod
def get_customer_service_by_location ( province_name , city_name , area_name , staff_id_list ) :
"""
直接遍历 self.staff_id_list,根据省市区匹配续约回访客服。
:return: 客服用户名(str),未找到则返回提示信息
"""
if not all ( [ province_name , city_name , area_name ] ) :
return " 数据缺失: 省市区不完整 "
for item in staff_id_list or [ ] :
try :
prov = item . get ( ' _widget_1734677164861 ' , ' ' ) . strip ( )
city = item . get ( ' _widget_1734677164862 ' , ' ' ) . strip ( )
area = item . get ( ' _widget_1734677164863 ' , ' ' ) . strip ( )
if ( prov == province_name . strip ( ) and
city == city_name . strip ( ) and
area == area_name . strip ( ) ) :
# 提取客服用户名
staff_info = item . get ( ' _widget_1734677164869 ' , { } ) # 续约回访客服
username = staff_info . get ( ' username ' )
return username if username else " 数据缺失: 客服用户名为空 "
except Exception :
continue # 跳过格式异常的记录
return " 数据缺失: 未找到对应的续约回访客服 "
def build_subform_records (
self ,
df : pd . DataFrame ,
group_by_col : str ,
field_mapping : dict ,
) - > dict :
"""
通用子表单预处理函数:将子表单 DataFrame 转换为 { group_key: [subform_record1, subform_record2, ...]} 的字典。
:param df: 子表单数据 DataFrame,列名为中文(如 " 商品名称 " , " 分母金额 " )
:param group_by_col: 用于分组的列名(如 " 门店编码 " )
:param field_mapping: 字段映射字典, { 中文字段名: widget_id},例如 { " 商品名称 " : " _widget_xxx " }
:return: dict, key 为 group_by_col 的值,value 为该组对应的子表单记录列表,
每条记录是 { widget_id: { " value " : clean_value}} 的 dict
"""
if df . empty :
return defaultdict ( list )
result = defaultdict ( list )
target_fields = set ( field_mapping . keys ( ) )
for _ , row in df . iterrows ( ) :
row_dict = row . to_dict ( )
group_key = row_dict . get ( group_by_col )
if not group_key or ( isinstance ( group_key , str ) and group_key . strip ( ) == " " ) :
warning_msg = f " 子表单行缺少分组字段 ' { group_by_col } ' ,跳过: { row_dict } "
# 构建单条子表单记录
sub_record = { }
for field_cn , widget_id in field_mapping . items ( ) :
val = row_dict . get ( field_cn )
# 清理值
if pd . isna ( val ) :
clean_val = None
elif hasattr ( val , ' to_eng_string ' ) : # Decimal
try :
clean_val = float ( val )
except ( ValueError , TypeError ) :
clean_val = str ( val )
elif isinstance ( val , pd . Timestamp ) :
clean_val = val . strftime ( ' % Y- % m- %d % H: % M: % S ' )
else :
clean_val = val
sub_record [ widget_id ] = { " value " : clean_val }
result [ group_key ] . append ( sub_record )
return result
def process_data ( self ) :
"""
数据处理加工
:return: 处理后的 DataFrame,列名为中文
"""
data_NGV = self . data_NGV . copy ( ) # 避免修改原始数据
# === 将英文字段名替换为中文字段名 ===
# 但只重命名存在的列
rename_map = { en : cn for en , cn in self . cn_field_map . items ( ) if en in data_NGV . columns }
data_NGV . rename ( columns = rename_map , inplace = True )
# 日期字段处理(使用中文列名)
time_columns = [ ' 过期日 ' ]
data_NGV [ time_columns ] = data_NGV [ time_columns ] . apply (
lambda col : pd . to_datetime ( col , errors = ' coerce ' )
. dt . tz_localize ( ' Asia/Shanghai ' )
. dt . tz_convert ( ' UTC ' )
)
# 新增4列:辅助时间字段
data_NGV [ ' 120天自动流转时间 ' ] = data_NGV [ ' 过期日 ' ] - pd . Timedelta ( days = 60 )
data_NGV [ ' 60天自动流转时间 ' ] = data_NGV [ ' 过期日 ' ] - pd . Timedelta ( days = 30 )
data_NGV [ ' 30天自动流转时间 ' ] = data_NGV [ ' 过期日 ' ] - pd . Timedelta ( days = 0 )
data_NGV [ ' 0天自动流转时间 ' ] = data_NGV [ ' 过期日 ' ] + pd . Timedelta ( days = 90 )
data_NGV [ ' 120天是否跟进 ' ] = " 主动 "
data_NGV [ ' 60天是否跟进 ' ] = " 主动 "
data_NGV [ ' 30天是否跟进 ' ] = " 主动 "
# 格式化为字符串(去掉时区)
for col in [ ' 过期日 ' , ' 120天自动流转时间 ' , ' 60天自动流转时间 ' , ' 30天自动流转时间 ' , ' 0天自动流转时间 ' ] :
data_NGV [ col ] = data_NGV [ col ] . dt . strftime ( ' % Y- % m- %d % H: % M: % S ' )
# 新增加盟商列
data_NGV = data_NGV . merge (
self . franchisee [ [ ' 门店编码 ' , ' 加盟商 ' ] ] ,
on = ' 门店编码 ' ,
how = ' left '
)
# 新增上次购买价格列
# 1. 清洗数据
df_lp = self . last_price [ [ ' 门店编码 ' , ' 类型 ' , ' 订单商品名称 ' , ' 价格 ' ] ] . copy ( )
# 处理“类型”和“订单商品名称”的缺失值
df_lp [ ' 类型 ' ] = df_lp [ ' 类型 ' ] . fillna ( ' ' ) . astype ( str )
df_lp [ ' 订单商品名称 ' ] = df_lp [ ' 订单商品名称 ' ] . fillna ( ' ' ) . astype ( str )
# 处理价格:转数字、四舍五入、填0、转字符串
df_lp [ ' 价格 ' ] = (
pd . to_numeric ( df_lp [ ' 价格 ' ] , errors = ' coerce ' )
. round ( ) . fillna ( 0 ) . astype ( int ) . astype ( str )
)
# 2. 拼接“类型:价格”
df_lp [ ' 类型_价格 ' ] = df_lp [ ' 类型 ' ] + ' : ' + df_lp [ ' 价格 ' ]
# 3. 按门店聚合两列
agg_df = df_lp . groupby ( ' 门店编码 ' , as_index = False ) . agg ( {
' 类型_价格 ' : lambda x : ' ; ' . join ( x ) ,
' 订单商品名称 ' : lambda x : ' ; ' . join ( x )
} )
# 4. 合并回主表
data_NGV = data_NGV . merge ( agg_df , on = ' 门店编码 ' , how = ' left ' )
# 5. 填充缺失值为空字符串,并重命名列
data_NGV [ ' 类型_价格 ' ] = data_NGV [ ' 类型_价格 ' ] . fillna ( ' ' )
data_NGV [ ' 订单商品名称 ' ] = data_NGV [ ' 订单商品名称 ' ] . fillna ( ' ' )
data_NGV . rename ( columns = {
' 类型_价格 ' : ' 上次购买价格 ' ,
' 订单商品名称 ' : ' 订单商品名称 '
} , inplace = True )
# 成员字段替换(现在列名是中文)
staff_name_cols = [
" 专属运营顾问 " ,
" 运营专家 " ,
]
data_NGV = self . replace_names_with_staff_ids ( data_NGV , staff_name_cols , self . staff_id_list )
return data_NGV
def dispatch_task ( self , data_NGV ) :
"""
拆分为三个独立动作(输入 data_NGV 列名为中文):
1. 获取关联数据(NGV_data_id)
2. 获取区域客服(regional_customer_service)
3. 字段映射与格式化(中文 → widget),正确处理子表单
"""
records = [ ]
no_customer_service_data = [ ]
# === 使用通用函数预处理周期性增购子表单 ===
cyclic_subforms = self . build_subform_records (
df = self . cyclic_increasing ,
group_by_col = " 门店编码 " ,
field_mapping = self . subform_field_map ,
)
# === Step 1: 构建 门店编码 → NGV 数据ID 映射 ===
org_code_to_ngv_id = { }
for ngv_item in self . NGV_data_list or [ ] :
org_code = ngv_item . get ( " _widget_1734062123071 " )
ngv_id = ngv_item . get ( " _id " )
if org_code and ngv_id :
org_code_to_ngv_id [ org_code ] = ngv_id
# === Step 2: 定义获取区域客服的函数 ===
def get_regional_customer_service ( row ) :
province = row . get ( " 省份 " ) or row . get ( " province_name " )
city = row . get ( " 城市 " ) or row . get ( " city_name " )
area = row . get ( " 区县 " ) or row . get ( " district_name " ) or row . get ( " area_name " )
org_code = row . get ( " 门店编码 " )
# 若省市区缺失,尝试从 NGV 补全
if not all ( [ province , city , area ] ) or any (
v in [ None , ' ' , ' None ' , ' NA ' ] for v in [ province , city , area ]
) :
ngv_record = next (
( item for item in self . NGV_data_list
if item . get ( " _widget_1734062123071 " ) == org_code ) ,
None
)
if ngv_record :
province = ngv_record . get ( " _widget_1734062123090 " )
city = ngv_record . get ( " _widget_1734062123092 " )
area = ngv_record . get ( " _widget_1734062123094 " )
logger . info ( f " 【从NGV补全省市区】门店 { org_code } : { province } , { city } , { area } " )
if not all ( [ province , city , area ] ) or any (
v in [ None , ' ' , ' None ' , ' NA ' ] for v in [ province , city , area ]
) :
logger . warning ( f " 【省市区信息缺失】门店 { org_code } 省市区不完整,客服设为空 " )
return None
customer_service = self . get_customer_service_by_location (
str ( province ) . strip ( ) ,
str ( city ) . strip ( ) ,
str ( area ) . strip ( ) ,
self . province_staff_id_list
)
if customer_service and " 数据缺失 " not in str ( customer_service ) :
logger . info ( f " 【派发客服】门店 { org_code } 派发给客服: { customer_service } " )
return customer_service
else :
logger . warning ( f " 未找到区域客服,请检查门店编码: { org_code } " )
return None
# === Step 3: 遍历主表每一行,构建最终提交记录 ===
for _ , row in data_NGV . iterrows ( ) :
row_dict = row . to_dict ( )
# 3.1 关联数据(NGV ID)
org_code = row_dict . get ( " 门店编码 " )
ngv_id = org_code_to_ngv_id . get ( org_code )
row_dict [ " 关联数据 " ] = ngv_id if ngv_id else None
if not ngv_id :
logger . warning ( f " 未找到关联数据,请检查门店编码: { org_code } " )
# 3.2 区域客服
customer_service = get_regional_customer_service ( row_dict )
row_dict [ " 区域客服 " ] = customer_service
if not customer_service :
no_customer_service_data . append ( row_dict )
# 3.3 注入周期性增购子表单
row_dict [ " 周期性增购 " ] = cyclic_subforms . get ( org_code , [ ] )
# 3.4 转换为 widget 格式
widget_record = self . row_to_dict ( row_dict , self . field_map )
records . append ( widget_record )
# === Step 4: 批量提交 ===
if not records :
logger . info ( " 无数据需要派发 " )
return
payload = {
" api_key " : " 675b900991ad2491c69389ca " ,
" entry_id " : " 6931063d64187eaf6b927557 " ,
" data_list " : records
}
print ( payload )
api_instance . entry_data_batch_create ( payload )
logger . info ( f " 已提交 { len ( records ) } 条数据进行派发 " )
def main ( self ) :
task_start_time = datetime . now ( ) . strftime ( " % Y- % m- %d % H: % M: % S " )
try :
logger . info ( " 任务开始 " )
# step1: 获取数据
self . load_all_data ( )
logger . info ( " 加载数据完成 " )
# step2:数据处理
data_NGV = self . process_data ( )
# step3:数据派发
self . dispatch_task ( data_NGV )
common_module . send_task_status ( task_start_time , " 续约回访待办 " )
except Exception as e :
error_task_logger . error ( f " 续约回访待办发生错误 { e } " )
common_module . send_task_error ( task_start_time , " 续约回访待办 " , str ( e ) )
if __name__ == ' __main__ ' :
RenewalToDo ( ) . main ( )