Files
saas/back_ground_module/新签180天重构.py
2025-08-12 13:43:10 +08:00

821 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import os
import time
import requests
from api import API
import re
from back_ground_module import CommonModule
import pandas as pd
from log_config import configure_task_logger, configure_error_task_logger
api_instance = API()
common_module = CommonModule()
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
class NewServicesRevisitTest:
"""
用于处理新签服务回访的类。
"""
def __init__(self):
# 初始化所有数据属性
self.get_feature_usage = None
self.saas_create_time = None
self.index = None
self.date_one = None
self.data_NGV_S = None
self.date_list = None
self.Smart_detection = None
self.service_remind = None
self.NGV_data_list = None
self.permissions_table = None
self.staff_id_list = None
self.json_list = []
self.policy_recognition = None
self.widget_list = None
self.private_domain = None
self.public_domain = None
self.public_domain_list = None
self.different_industries = None
self.different_industries_list = None
self.groupnotification = None
def calculate_date_one(self, start_offset=0):
"""
计算从当前日期(或指定偏移量的日期)开始,往前遍历遇到date_list中日期的次数。
参数:
- start_offset: 从当前日期起始的天数偏移量,默认为0(即今天)。负数表示过去,正数表示未来。
返回:
- date_one: 遍历到date_list中日期的次数。
"""
jdy_date = datetime.datetime.now().strftime("%Y-%m-%d")
jdy_start_time = datetime.datetime.now().strftime("%Y-%m-%d ")
# 设置起始日期
now_time = datetime.datetime.now() + datetime.timedelta(days=start_offset)
# 初始化计数器
date_one = 1
print("当前日期:", now_time.strftime("%Y-%m-%d"))
# 检查起始日期是否在date_list中
if now_time.strftime("%Y-%m-%d") in self.date_list:
date_one = 0
print("开始次数:", date_one)
else:
# 遍历日期
for i in range(1, 10):
new_date = now_time + datetime.timedelta(days=-i)
new_date_str = new_date.strftime("%Y-%m-%d")
print("遍历日期:", new_date_str)
if new_date_str in self.date_list:
date_one += 1
print("节假日期:", new_date_str)
else:
break
print("遍历次数:", date_one)
return date_one
@staticmethod
def download_url_content(url, save_path):
"""
下载指定 URL 的内容并保存到本地文件。简道云图片下载
:param url: 要下载内容的 URL
:param save_path: 保存文件的路径
"""
try:
# 发送 GET 请求以获取内容
response = requests.get(url, stream=True)
response.raise_for_status() # 如果响应状态码不是 200,抛出异常
# 确保保存目录存在
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# 将内容写入文件
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192): # 分块写入,避免占用过多内存
if chunk: # 过滤掉空块
file.write(chunk)
print(f"文件已成功保存到 {save_path}")
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
except Exception as e:
print(f"发生错误: {e}")
def load_all_data(self):
"""加载所有必要的数据表"""
# 省市区人员关系表
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676512ac3e54dc3159460c0a"}
json_dict = api_instance.entry_data_list(payload)
self.json_list = json_dict.get("data")
# 获取简道云员工id
payload = {"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "6769204a1902c9341340a1bc",
}
staff_id = api_instance.entry_data_list(payload)
self.staff_id_list = staff_id.get("data") # api请求格式,将数据封装在data字典里
# 获取权限表信息
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675b96c14e839f90fef1647c"}
self.permissions_table = api_instance.entry_data_list(payload).get("data", [])
# 获取NGV数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "675bb02bd2d53c2034c665e4"}
self.NGV_data_list = api_instance.entry_data_list(payload).get("data", [])
# print("NGV获取后的类型:", type(self.NGV_data_list))
# 获取服务提醒-数据支持表单数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676bb7bda3029720f1083e99"}
self.service_remind = api_instance.entry_data_list(payload).get("data", [])
# 获取智能检测-数据支持表单数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "676bb99649ab3ac975af6e39"}
self.Smart_detection = api_instance.entry_data_list(payload).get("data", [])
# 获取功能使用情况表
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "6763bbf657bd8fb76fcb41b2"}
self.get_feature_usage = api_instance.entry_data_list(payload).get("data", [])
# 获取保单识别表
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "6773a60d30ed87ff9f68d3c5"}
self.policy_recognition = api_instance.entry_data_list(payload).get("data")
# 提取 _widget_1735632397600 的值并存储在列表中
self.widget_list = [item['_widget_1735632397600'] for item in self.policy_recognition]
# 获取私域小程序-数据支持表单数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "67e0f0fae622896749ba5087"}
self.private_domain = api_instance.entry_data_list(payload).get("data", [])
# 提取 _widget_1742795002375 的值并存储在列表中
# self.private_domain = [item['_widget_1742795002375'] for item in self.private_domain]
# 获取公域小程序-数据支持表单数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "67e0c702c8f603b997980999"}
self.public_domain = api_instance.entry_data_list(payload).get("data", [])
# 提取 _widget_1742784257506 的值并存储在列表中
self.public_domain_list = [item['_widget_1742784257506'] for item in self.public_domain]
# 获取异业合作-数据支持表单数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "67e24fdd8dfcfa918e17c30b"}
self.different_industries = api_instance.entry_data_list(payload).get("data", [])
# 提取 _widget_1742784257506 的值并存储在列表中
self.different_industries_list = [item['_widget_1742884829007'] for item in self.different_industries]
# 获取短信-数据支持表单数据
payload = {"api_key": "675b900991ad2491c69389ca", "entry_id": "67e5107198ba1b20d5df3974"}
self.groupnotification = api_instance.entry_data_list(payload).get("data", [])
@staticmethod
def build_index(json_list):
"""构建省市区人员索引"""
index = {}
# print(json_list)
for json_item in json_list:
# json_item = json_item.get("data")
# print(json_item)
try:
key = (json_item['_widget_1734677164861'], json_item['_widget_1734677164862'],
json_item['_widget_1734677164863']) # 省市区
if '_widget_1734677164871' not in json_item: # 日常回访客服
raise KeyError("缺少 '日常回访客服'")
index[key] = json_item
except KeyError as e:
print(f"警告:{e},跳过该条记录: {json_item}")
continue
return index
@staticmethod
def find_customer_service(province_name, city_name, area_name, index):
"""根据省市区查找客服"""
key = (province_name, city_name, area_name)
# print(index)
if key not in index:
return "数据缺失: 未找到对应的日常回访客服"
return index[key]
@staticmethod
def remove_parentheses(text: str) -> str:
"""使用正则表达式匹配并去除括号及其内容"""
# \s* 表示匹配零个或多个空白字符(处理括号前后可能存在的空格)
# $ 和 $ 分别表示匹配左括号和右括号
# 中间的 .*? 表示非贪婪地匹配任意数量的字符(包括没有字符的情况)
cleaned_text = re.sub(r'\s*$.*?$\s*', '', text)
# 为了确保同时处理中文括号,再进行一次替换
cleaned_text = re.sub(r'\s*.*?\s*', '', cleaned_text)
return cleaned_text.strip() # 去除两端多余的空白字符
@staticmethod
def get_staff_id(row_item, name):
"""辅助函数,用于获取员工ID"""
if str(row_item["_widget_1734942794144"]) == str(name): # 检查姓名是否匹配
return row_item["_widget_1734942794145"] # 返回员工ID
return None
def assign_customer_service(self, province_name, city_name, area_name, index):
"""根据省市区派发给日常回访客服"""
try:
customer_service_info = self.find_customer_service(province_name, city_name, area_name, index)
# 定义一个辅助函数,用于安全地获取多层字段中的 username
def safe_get_username(data, key):
try:
if isinstance(data, dict):
return data.get(key, {}).get('username', "")
return ""
except:
return ""
relationship_manager = safe_get_username(customer_service_info, '_widget_1734677164864')
customer_service = safe_get_username(customer_service_info, '_widget_1734677164871')
technician = safe_get_username(customer_service_info, '_widget_1734677164866')
area_manager = safe_get_username(customer_service_info, '_widget_1734677164865')
return relationship_manager, customer_service, technician, area_manager
except Exception as e:
print(f"Error finding customer service: {e}")
return "分配失败,请检查", "分配失败,请检查", "分配失败,请检查"
def prepare_data(self):
"""准备基础数据"""
# Step 1: 加载所有数据表
self.load_all_data()
# Step 2: 获取节假日列表
self.date_list = common_module.get_holiday_list()
# Step 3: 获取NGV数据
self.data_NGV_S = common_module.get_ngv_details(days_back=1).astype(str)
# Step 4: 计算日期偏移
self.date_one = self.calculate_date_one(start_offset=0)
# Step 5: 构建索引
self.index = self.build_index(self.json_list)
# Step 6: 设置时间偏移常量
self.saas_create_time_180 = 173
self.saas_create_time_90 = 83
def filter_and_process_data(self):
"""过滤和处理数据"""
# Step 1: 准备数据
self.prepare_data()
print("开始运行主流程")
for i in range(0, self.date_one):
# Step 2: 计算当前处理日期
now_time = datetime.datetime.now() + datetime.timedelta(days=-i)
today_180 = now_time + datetime.timedelta(days=-self.saas_create_time_180)
formatted_today_180 = today_180.strftime("%Y-%m-%d")
today_90 = now_time + datetime.timedelta(days=-self.saas_create_time_90)
formatted_today_90 = today_90.strftime("%Y-%m-%d")
print("SaaS开户回访日期:", formatted_today_180, formatted_today_90)
# Step 3: 数据预处理
data_NGV = self.data_NGV_S.copy()
data_NGV = data_NGV.apply(lambda series: series.apply(
lambda x: '' if pd.isna(x) or x in ['NA', 'None', ''] else x))
# Step 4: 计算最佳版本、分层和等级
edition_order = ['皇冠版', '至尊版', '尊享版', '旗舰版', '标准版', '进阶版', '基础版', '入门版']
customer_type_order = ["F", "E", "D", "C", "B", "A"]
group_grade_order = ['全国KAFMVP', '区域KAMVP', '重要客户(SVIP', '普通客户(VIP']
# 创建映射字典
edition_map = {edition: idx for idx, edition in enumerate(edition_order)}
customer_type_map = {ctype: idx for idx, ctype in enumerate(customer_type_order)}
group_grade_map = {grade: idx for idx, grade in enumerate(group_grade_order)}
# 添加排序列
data_NGV['edition_rank'] = data_NGV['saas_edition_fmt'].map(edition_map).fillna(0).astype(int)
data_NGV['customer_type_rank'] = data_NGV['saas_customer_type'].map(customer_type_map).fillna(0).astype(int)
data_NGV['group_grade_rank'] = data_NGV['group_grade'].map(group_grade_map).fillna(0).astype(int)
# 找到最佳值
best_edition_idx = data_NGV.groupby('id_own_group')['edition_rank'].idxmin()
best_edition_rows = data_NGV.loc[best_edition_idx]
best_edition_rows['max_saas_edition'] = best_edition_rows['saas_edition_fmt']
best_customer_type_idx = data_NGV.groupby('id_own_group')['customer_type_rank'].idxmin()
best_customer_type_rows = data_NGV.loc[best_customer_type_idx]
best_customer_type_rows['max_saas_customer_type'] = best_customer_type_rows['customer_type_rank'].apply(
lambda x: customer_type_order[x])
best_group_grade_idx = data_NGV.groupby('id_own_group')['group_grade_rank'].idxmin()
best_group_grade_rows = data_NGV.loc[best_group_grade_idx]
best_group_grade_rows['max_group_grade'] = best_group_grade_rows['group_grade']
# 合并最佳值
best_values = (
best_edition_rows[['id_own_group', 'max_saas_edition']]
.merge(best_customer_type_rows[['id_own_group', 'max_saas_customer_type']], on='id_own_group',
how='outer')
.merge(best_group_grade_rows[['id_own_group', 'max_group_grade']], on='id_own_group', how='outer')
)
data_NGV = data_NGV.merge(best_values, on='id_own_group', how='left')
# Step 5: 过滤数据
condition = (data_NGV['is_main_org'] == 1) & (data_NGV['org_status'] == '过期')
ngvv2 = data_NGV[condition]
data_NGV_V2 = data_NGV.copy()
data_NGV_V2['条件'] = (
(data_NGV_V2['org_type'] == "一般") &
(data_NGV_V2['org_status'] == '留存') &
(data_NGV_V2['area_manager'] != '殷昊') &
(data_NGV_V2['area_manager'] != '孙玉蕾') &
(data_NGV_V2['is_main_org'] != 1)
)
data_NGV_V2 = data_NGV_V2.loc[data_NGV_V2["条件"]]
data_NGV_V2['exists_in_ngvv2'] = data_NGV_V2['id_own_group'].isin(ngvv2['id_own_group'])
filtered_data = data_NGV_V2[data_NGV_V2['exists_in_ngvv2']]
# 排序
fixed_order_map = {edition: index for index, edition in enumerate(edition_order)}
filtered_data['sort_key'] = filtered_data['saas_edition_fmt'].map(fixed_order_map)
filtered_data = filtered_data.sort_values(by='sort_key').drop('sort_key', axis=1)
result = filtered_data.drop_duplicates(subset='id_own_group', keep='first')
data_NGV['条件'] = (
(data_NGV['org_type'] == "一般") &
(data_NGV['org_status'] == '留存') &
(data_NGV['area_manager'] != '殷昊') &
(data_NGV['area_manager'] != '孙玉蕾') &
((data_NGV['saas_create_time'] == formatted_today_180) |
(data_NGV['saas_create_time'] == formatted_today_90)) &
(data_NGV['is_main_org'] == "1")
)
data_NGV = data_NGV.loc[data_NGV["条件"]]
data_NGV = pd.concat([data_NGV, result], axis=0)
# Step 6: 设置跟进阶段和主要目的
def set_columns(row):
if row['saas_create_time'] == formatted_today_180:
row['跟进阶段'] = '新签后180天'
row['主要目的'] = '关怀使用情况,邀约转介绍,跟进增购商机,识别首年续约风险,及时跟进提报。'
elif row['saas_create_time'] == formatted_today_90:
row['跟进阶段'] = '新签后90天'
row['主要目的'] = '关怀使用情况,解答使用问题,强化培训,挖掘增购商机。'
else:
row['跟进阶段'] = "派发异常请联系数据组!"
row['主要目的'] = "派发异常请联系数据组!"
return row
data_NGV = data_NGV.apply(set_columns, axis=1)
print("SaaS开户回访人数:", len(data_NGV))
# Step 7: 重置索引
data_NGV = data_NGV.reset_index(drop=True)
# Step 8: 处理每个门店数据
self.process_shop_data(data_NGV, formatted_today_180, formatted_today_90)
def process_shop_data(self, data_NGV, formatted_today_180, formatted_today_90):
"""处理每个门店的数据"""
for index_num, row in data_NGV.iterrows():
try:
# Step 1: 准备基础数据
payload_dict = {}
saas_use_year = re.findall(r'第([0-9]+)年', row["saas_use_year"])[0]
# Step 2: 获取员工ID
NGV_roles = {
'relationship_manager': row['service_impl_principal'],
'area_manager': row['area_manager'],
'technician': row['technician'],
'salesmen': row['salesmen'],
}
for role, name in NGV_roles.items():
for row_item in self.staff_id_list:
staff_id = self.get_staff_id(row_item, name)
if staff_id:
NGV_roles[role] = staff_id
break
else:
NGV_roles[role] = None
# Step 3: 分配回访人员
if int(saas_use_year) < 4:
relationship_manager, area_manager, technician, salesmen = [
NGV_roles[role] for role in ['relationship_manager', 'area_manager', 'technician', 'salesmen']
]
# 如果未找到运营负责人,则根据省市区派发给日常回访客服
if not relationship_manager or not technician:
relationship_manager, _, technician, _ = self.assign_customer_service(
row['province_name'], row['city_name'], row['area_name'], self.index
)
if row["group_grade"] in ["普通客户(VIP", "重要客户(SVIP"]:
payload_dict["_widget_1734590278288"] = {"value": relationship_manager} # 跟进人是运营负责人
else:
payload_dict["_widget_1734590278288"] = {"value": technician} # 跟进人是技术专家
else:
salesmen = NGV_roles['salesmen']
relationship_manager, customer_service, technician, area_manager = self.assign_customer_service(
row['province_name'], row['city_name'], row['area_name'], self.index
)
if row["group_grade"] in ["普通客户(VIP", "重要客户(SVIP"]:
payload_dict["_widget_1734590278288"] = {"value": customer_service}
else:
payload_dict["_widget_1734590278288"] = {"value": technician}
# 其他成员字段赋值
payload_dict.update({
"_widget_1734590278289": {"value": relationship_manager}, # 运营负责人
"_widget_1734590278290": {"value": area_manager}, # 区域经理
"_widget_1734590278291": {"value": technician}, # 技术专家
"_widget_1735290738545": {"value": salesmen} # 销售负责人
})
# Step 4: 处理特殊逻辑
if payload_dict.get("_widget_1734590278288") == "02414917880947": # 如果跟进人是殷浩
payload_dict["_widget_1734590278288"] = {"value": "051612246035720178"} # 跟进人是赵柄诚
# Step 5: 获取权限信息
group_grade = re.sub(r'[^]*', '', row['max_group_grade'])
if not row['saas_customer_type'] or row['saas_customer_type'] in ['NA', 'None']:
row['saas_customer_type'] = "F"
NGV_store_level_key = group_grade + row['max_saas_edition'] + row['max_saas_customer_type']
# Step 6: 处理权限表逻辑...
print("权限唯一值:", NGV_store_level_key)
Billing = None
for item in self.permissions_table:
if NGV_store_level_key == item.get("_widget_1734056507963"): # 合并(等级-类型-分层)
print("该门店开单的权限是:", item.get(item.get("_widget_1734055617039")))
Billing = item.get("_widget_1734055617039") # 开单
Service_Alerts = item.get("_widget_1734055617040") # 服务提醒
membership = item.get("_widget_1734055617041") # 会员卡
SMS = item.get("_widget_1734055617042") # 短信
Public_domain_applets = item.get("_widget_1734055617043") # 公域小程序
Private_domain_applets = item.get("_widget_1734055617044") # 私域小程序
Test_sheet = item.get("_widget_1734055617045") # 检测单
AI_poster = item.get("_widget_1734055617046") # AI海报
Business_wallets = item.get("_widget_1734055617047") # 企业钱包
Precision_marketing = item.get("_widget_1734055617049") # 精准营销
Paid_memberships = item.get("_widget_1734055617051") # 付费会员
business_WeCom = item.get("_widget_1734055617052") # 企业微信
Insurance_policy_identification = item.get("_widget_1734055617053") # 保险单识别
Insurance_bots = item.get("_widget_1734055617054") # 保险机器人
Camera_pick_up = item.get("_widget_1734055617055") # 摄像头接车
Camera_billing = item.get("_widget_1734055617056") # 摄像头开单
Transparent_workshop = item.get("_widget_1734055617057") # 透明车间
Cross_industry_cooperation = item.get("_widget_1734055617058") # 异业合作
BI_Insights = item.get("_widget_1734055617059") # BI洞察
payload_dict.update(
{
"_widget_1734073342350": {"value": Billing},
"_widget_1735004315757": {"value": Service_Alerts},
"_widget_1735004315756": {"value": membership},
"_widget_1735004315755": {"value": SMS},
"_widget_1735004315754": {"value": Public_domain_applets},
"_widget_1735004315753": {"value": Private_domain_applets},
"_widget_1735004315752": {"value": Test_sheet},
"_widget_1735004315751": {"value": AI_poster},
"_widget_1735004315750": {"value": Business_wallets},
"_widget_1735004315749": {"value": Precision_marketing},
"_widget_1735004315748": {"value": Paid_memberships},
"_widget_1735004315747": {"value": business_WeCom},
"_widget_1735004315746": {"value": Insurance_policy_identification},
"_widget_1735004315745": {"value": Insurance_bots},
"_widget_1735004315744": {"value": Camera_pick_up},
"_widget_1735004315743": {"value": Camera_billing},
"_widget_1735004315742": {"value": Transparent_workshop},
"_widget_1735004315741": {"value": Cross_industry_cooperation},
"_widget_1734073342352": {"value": BI_Insights},
}
)
feature_dict = {
"开单": "_widget_1734073342350",
"服务提醒": "_widget_1735004315757",
"会员卡": "_widget_1735004315756",
"短信": "_widget_1735004315755",
"公域小程序": "_widget_1735004315754",
"私域小程序": "_widget_1735004315753",
"检测单": "_widget_1735004315752",
"AI海报": "_widget_1735004315751",
"企业钱包": "_widget_1735004315750",
"精准营销": "_widget_1735004315749",
"付费会员": "_widget_1735004315748",
"企业微信": "_widget_1735004315747",
"保险单识别": "_widget_1735004315746",
"保险机器人": "_widget_1735004315745",
"摄像头接车": "_widget_1735004315744",
"摄像头开单": "_widget_1735004315743",
"透明车间": "_widget_1735004315742",
"异业合作": "_widget_1735004315741",
"BI洞察": "_widget_1734073342352",
}
# _widget_1735527329557 下次是否推荐
for new_item in self.get_feature_usage:
for feature_module, feature_value in feature_dict.items(): # 模块
if new_item.get("_widget_1735268263079") == feature_module and new_item.get(
"_widget_1735527329557") == "" and new_item.get(
"_widget_1735280720550") == \
row["id_own_org"]: # 下次是否推荐 功能使用情况表
print(f"{feature_value}进行了更改")
payload_dict.update({f"{feature_value}": {"value": "×"}})
if new_item.get("_widget_1735268263079") == feature_module and new_item.get(
"_widget_1736414617462") == "" and new_item.get(
"_widget_1735280720550") == \
row["id_own_org"]: # 下次是否跟进
print(f"{feature_value}进行了更改")
payload_dict.update({f"{feature_value}": {"value": "×"}})
fields_to_check = {
"_widget_1735004315763": Billing, # 开单
"_widget_1735106258016": Service_Alerts, # 服务提醒
"_widget_1735106258036": membership, # 会员卡
"_widget_1735106258086": SMS, # 短信
"_widget_1735106258112": Public_domain_applets, # 公域小程序
"_widget_1735106258141": Private_domain_applets, # 私域小程序
"_widget_1735107354648": Test_sheet, # 检测单
"_widget_1735107354725": AI_poster, # AI海报
"_widget_1735107354811": Business_wallets, # 企业钱包
"_widget_1735107354906": Precision_marketing, # 精准营销
"_widget_1735107354980": Paid_memberships, # 付费会员
"_widget_1735107355093": business_WeCom, # 企业微信
"_widget_1735107355143": Insurance_policy_identification, # 保险单识别
"_widget_1735107355235": Insurance_bots, # 保险机器人
"_widget_1735107355333": Camera_pick_up, # 摄像头接车
"_widget_1735107355392": Camera_billing, # 摄像头开单
"_widget_1735107355502": Transparent_workshop, # 透明车间
"_widget_1735107355618": Cross_industry_cooperation, # 异业合作
"_widget_1735107355740": BI_Insights # BI洞察
}
# 遍历每个字段,检查其值并更新payload_dict
for widget_id, field_name in fields_to_check.items():
if field_name == "":
payload_dict.update({widget_id: {"value": ""}})
break
if not Billing:
print(f"权限表请求失败或者权限表无对应关系,权限唯一值是:{NGV_store_level_key}")
# 根据NGV内容给出默认值
if row["active_status_fmt"] == "活跃": # 开单 是否使用
payload_dict.update({"_widget_1735004315765": {"value": ""}})
else:
payload_dict.update({"_widget_1735004315765": {"value": ""}})
try:
if row["saas_edition_fmt"] not in ["基础版", "入门版"]: # 会员卡 是否拥有
payload_dict.update({"_widget_1735106258036": {"value": ""}})
else:
payload_dict.update({"_widget_1735106258036": {"value": ""}})
except Exception as e:
print(f"会员卡识别:Error finding customer service: {e}")
try:
if row["card_bill_day_count_last_30_day"] != "0": # 会员卡 是否使用
payload_dict.update({"_widget_1735106258038": {"value": ""}})
else:
payload_dict.update({"_widget_1735106258038": {"value": ""}})
except Exception as e:
print(f"会员卡识别:Error finding customer service: {e}")
# print(self.service_remind.get("_widget_1735112637045"))
payload_dict["_widget_1735106258018"] = {"value": ""}
for item in self.service_remind:
if row["id_own_group"] == item.get("_widget_1735112637043"):
if int(item.get("_widget_1735112637045")) < 180 and int(
item.get("_widget_1735112637046")) == 1: # 服务提醒 是否使用
payload_dict.update({"_widget_1735106258018": {"value": ""}})
break
elif int(item.get("_widget_1735112637048")) > 0:
payload_dict.update({"_widget_1735106258018": {"value": ""}})
break
keys_to_check = [
"_widget_1735113110155"
] # 智能检测 是否使用
# 初始化默认值为"否"
payload_dict["_widget_1735107354650"] = {"value": ""}
# 检查每个键,如果有一个大于0,则更新为"是"并停止检查
for key in keys_to_check:
for item in self.Smart_detection:
if row["id_own_org"] == item.get("_widget_1735113110147"):
if int(item.get(key, 0)) > 0: # 使用get方法并提供默认值0防止键不存在的情况
payload_dict["_widget_1735107354650"]["value"] = ""
break
# 近30天业务单量=0 则其它所有模块均不推荐
try:
for feature_module, feature_value in feature_dict.items(): # 模块
if row["bill_count_last_30_day"] == '0' and payload_dict[feature_value]["value"] == '':
payload_dict.update({f"{feature_value}": {"value": "×"}})
except Exception as e:
print(f"不开单识别:Error finding customer service: {e}")
# 保单识别:从系统中抽取目标门店,针对门店抽取修改是否推荐
try:
if row["org_code"] in self.widget_list:
payload_dict.update({'_widget_1735004315746': {"value": ""}})
except Exception as e:
print(f"保单识别:Error finding customer service: {e}")
# 私域小程序:根据是否开通微信小程序判断是否使用,旗舰版及以上算拥有
try:
for item in self.private_domain:
if row["id_own_group"] == item.get("_widget_1742795002375"): # 公司id
if int(item.get("_widget_1742795002379")) > 0: # 上架商品数
payload_dict.update({"_widget_1735106258143": {"value": ""}}) # DX:是否拥有
break
else:
payload_dict.update({"_widget_1735106258143": {"value": ""}}) # DX:是否拥有
break
except Exception as e:
print(f"私域小程序:Error finding customer service: {e}")
try:
high_version = ['皇冠版', '至尊版', '尊享版', '旗舰版']
if row["saas_edition_fmt"] in high_version:
payload_dict.update({'_widget_1735106258141': {"value": ""}}) # SYXCX:是否拥有
else:
payload_dict.update({'_widget_1735106258141': {"value": ""}}) # SYXCX:是否拥有
except Exception as e:
print(f"私域小程序:Error finding customer service: {e}")
# 公域小程序:根据是否开通微信小程序判断是否使用,旗舰版及以上算拥有
try:
for item in self.public_domain:
if row["id_own_org"] == item.get("_widget_1742784257506"): # 门店id
if int(item.get("_widget_1742784257509")) == 1: # 发布商品数量
payload_dict.update({"_widget_1735106258114": {"value": ""}}) # GYXCX:是否使用
break
else:
payload_dict.update({"_widget_1735106258114": {"value": ""}}) # GYXCX:是否使用
break
except Exception as e:
print(f"公域小程序:Error finding customer service: {e}")
try:
if row["id_own_org"] in self.public_domain_list:
payload_dict.update({'_widget_1735106258112': {"value": ""}}) # GYXCX:是否拥有
else:
payload_dict.update({'_widget_1735106258112': {"value": ""}}) # GYXCX:是否拥有
except Exception as e:
print(f"公域小程序:Error finding customer service: {e}")
# 异业合作:根据是否存在判断是否拥有,过滤条件 商品名称包含异业两个字
try:
if row["id_own_org"] in self.different_industries_list:
payload_dict.update({'_widget_1735107355618': {"value": ""}}) # YYHZ:是否拥有
else:
payload_dict.update({'_widget_1735107355618': {"value": ""}}) # YYHZ:是否拥有
except Exception as e:
print(f"异业合作:Error finding customer service: {e}")
# 短信:根据是否启动短信功能判断是否拥有,根据
try:
for item in self.groupnotification:
if row["id_own_group"] == item.get("_widget_1743065201885"): # 公司id
if int(item.get("_widget_1743065201886")) == 1: # 是否启动短信功能
payload_dict.update({"_widget_1735106258086": {"value": ""}}) # DX:是否拥有
break
else:
payload_dict.update({"_widget_1735106258086": {"value": ""}}) # DX:是否拥有
break
except Exception as e:
print(f"短信:Error finding customer service: {e}")
try:
for item in self.groupnotification:
if row["id_own_group"] == item.get("_widget_1743065201885"): # 公司id
if int(item.get("_widget_1743065201889")) > 0: # 累计发送成功总人数
payload_dict.update({"_widget_1735106258088": {"value": ""}}) # DX:是否使用
break
else:
payload_dict.update({"_widget_1735106258088": {"value": ""}}) # DX:是否使用
break
except Exception as e:
print(f"短信:Error finding customer service: {e}")
# Step 7: 创建回访记录
self.create_revisit_record(row, payload_dict)
except Exception as e:
error_task_logger.error(f"处理门店数据时出错: {e}")
continue
def create_revisit_record(self, row, payload_dict):
"""创建回访记录"""
# Step 1: 获取关联数据
NGV_data_id = None
for NGV_Data in self.NGV_data_list:
if row["org_code"] == NGV_Data.get("_widget_1734062123071"):
NGV_data_id = NGV_Data.get("_id")
try:
png_url = NGV_Data.get('_widget_1742890765211', {})[0].get('url', "")
except:
png_url = ""
# Step 2: 处理图片上传
upload_key = None
UUid = time.strftime("%Y%m%d%H%M%S", time.localtime())
if png_url:
save_dir = "sampleCloud"
os.makedirs(save_dir, exist_ok=True)
save_path = fr'{save_dir}\png\{UUid}.png'
self.download_url_content(png_url, save_path)
up_data = api_instance.get_upload_token(
{"api_key": "675b900991ad2491c69389ca",
"entry_id": "675b9c63925cd404038a6b86",
"transaction_id": UUid})
upload_url = up_data.get("upload_url")
upload_token = up_data.get("upload_token")
upload_result = api_instance.upload_file(
{"upload_url": upload_url,
"upload_token": upload_token,
"file_path": save_path})
upload_key = upload_result.get("key")
# Step 3: 设置分发时间
distribution_date = datetime.datetime.now(datetime.timezone.utc)
distribution_date = distribution_date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
# Step 4: 完善payload
payload_dict.update({
"_widget_1734590278279": {"value": row["group_name"]}, # 公司名称
"_widget_1735112931760": {"value": row["id_own_group"]}, # 公司id
"_widget_1735112931761": {"value": row["id_own_org"]}, # 门店id
"_widget_1734590278281": {"value": row['org_name']}, # 门店名称
"_widget_1734590278292": {"value": row["跟进阶段"]}, # 跟进阶段
"_widget_1734321349021": {"value": NGV_data_id}, # 关data_get联数据
"_widget_1742548684369": {"value": row['主要目的']}, # 主要目的
"_widget_1734590278266": {"value": row['region_name']}, # 大区
"_widget_1734590278285": {"value": row['branch_name']}, # 小区
"_widget_1734590278284": {"value": row['province_name']}, # 省
"_widget_1734590278283": {"value": row['city_name']}, # 市
"_widget_1734590278282": {"value": row['area_name']}, # 区
"_widget_1734590278278": {"value": row['saas_customer_type']}, # 门店分层
"_widget_1734590278277": {"value": row['group_grade']}, # 公司等级
"_widget_1734590278276": {"value": row['limit_user_type']}, # 限制账户类型
"_widget_1734590278275": {"value": row['active_user_type']}, # 有效账户类型
"_widget_1734590278274": {"value": row['saas_version']}, # ERP操作模式
"_widget_1734590278273": {"value": row['saas_use_year']}, # 使用时长
"_widget_1734590278272": {"value": row['org_stage']}, # 门店阶段
"_widget_1734590278271": {"value": row['manage_model']}, # 经营模式
"_widget_1734590278267": {"value": row['contacts']}, # 联系人
"_widget_1734590278287": {"value": row['contact_mobile']}, # 联系手机号
"_widget_1734590278286": {"value": row['saas_edition_fmt']}, # SaaS版本
"_widget_1734590278280": {"value": row['org_code']}, # 门店编码
"_widget_1735096489244": {"value": distribution_date}, # 派发时间
"_widget_1742895342914": {"value": row['business_scope_fmt']}, # 经营范围
"_widget_1742895342915": {"value": row['station_number']}, # 工位数
"_widget_1742895342916": {"value": [upload_key]} # 门头照片
})
# Step 5: 创建记录
routine_follow_up_payload = {
"api_key": "675b900991ad2491c69389ca",
# "entry_id": "675b9c63925cd404038a6b86", # 日常回访表单
"entry_id": "6850e88ebdfde576a2e91a59", # 测试表单
"is_start_workflow": "true",
"data": payload_dict,
"transaction_id": UUid
}
res = api_instance.data_batch_create(routine_follow_up_payload)
logger.info(f"创建结果:{res}")
def main(self):
"""主入口方法"""
# Step 1: 记录任务开始时间
task_start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# Step 2: 过滤和处理数据
self.filter_and_process_data()
# Step 3: 发送任务状态通知
common_module.send_task_status(task_start_time, "新签客户回访测试")
logger.info("任务执行成功")
except Exception as e:
error_task_logger.error(f"任务执行失败: {e}")
pass
if __name__ == '__main__':
start = NewServicesRevisitTest()
start.main()