Files
jdy_fastapi/app/tasks/customer_tasks.py
panda c4c4ccc7e9 1.客户信息修改,将硬编码改为动态取值
2.新增项目批量停用、材料批量修改功能
2026-01-08 10:18:18 +08:00

212 lines
7.7 KiB
Python

"""
客户相关后台任务模块
本模块包含客户相关的后台任务,包括:
- 客户信息批量修改
这些任务在后台线程中执行,不会阻塞主请求。
"""
import logging
import requests
import pandas as pd
import time
import os
from typing import Dict, Any, Optional
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def modify_customer_info_background(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str):
"""
修改客户信息后台任务
在后台线程中批量修改客户信息,从 Excel 文件中读取客户手机号和修改信息。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,第一列为客户手机号
save_path: Excel 文件保存的地址,执行完成后会删除此文件
Returns:
None
注意:
- 根据客户手机号匹配客户信息
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
try:
# 替换 NaN 为 None,便于后续判断
df = df.where(pd.notnull(df), None)
logger.info("开始获取当前客户下所有客户信息")
# 分页获取全部客户
params = {'pageSize': 100, 'pageNo': '1'}
res = requests.get(
'https://yunxiu.f6car.cn/member/customer/listForPermission',
params=params,
cookies=cookies,
timeout=10
)
res.raise_for_status()
total = int(res.json().get("data", {}).get("total", 0))
total_pages = (total // params["pageSize"]) + (1 if total % params["pageSize"] > 0 else 0)
logger.info(f"总计 {total_pages} 页,共 {total} 个客户")
all_customers = []
for page in range(1, total_pages + 1):
logger.debug(f"正在请求第 {page} 页...")
params["pageNo"] = page
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
response = requests.get(
'https://yunxiu.f6car.cn/member/customer/listForPermission',
params=params,
cookies=cookies,
timeout=10
)
response.raise_for_status()
page_data = response.json().get("data", {}).get("data", [])
all_customers.extend(page_data)
break
except Exception as e:
retry_count += 1
logger.warning(f"请求第 {page} 页失败(第 {retry_count} 次重试): {e}")
time.sleep(3)
else:
logger.error(f"{page} 页请求失败超过最大重试次数,跳过")
# 获取专属运营顾问列表
json_data = {
'includeStopedEmployee': False,
'pageSize': 1000,
'filterNullUser': False,
'keyword': '',
'idOwnOrgList': [],
}
staff_resp = requests.post(
'https://yunxiu.f6car.cn/hive/employee/searchStaffInGroup',
cookies=cookies,
json=json_data,
timeout=10
)
staff_resp.raise_for_status()
staff_list = staff_resp.json().get("data", {}).get("list", [])
name_to_userid = {
emp['name']: emp['userId']
for emp in staff_list
if emp.get('userId') is not None
}
# 在 df 中添加 userId 列
df['userId'] = df['专属运营顾问'].map(name_to_userid)
# 字段映射:Excel 列名 -> F6 字段名
FIELD_MAPPING = {
'客户姓名': 'name',
'客户类型': 'customerType',
'客户来源': 'customerSourceName',
'单位名称': 'companyName',
'客户备注': 'customerMemo',
'专属运营顾问': 'exclusiveConsultantName', # userId 单独处理
}
def convert_to_request_data(original_data: dict, df_row: pd.Series) -> dict:
"""以原始客户数据为基础,仅覆盖 Excel 中非空字段"""
customer_info = original_data.get("data", {}).get("customerInfo", {}) or {}
request_data = dict(customer_info) # 浅拷贝,足够用
# 覆盖指定字段
for excel_col, f6_field in FIELD_MAPPING.items():
value = df_row.get(excel_col)
if pd.notna(value):
request_data[f6_field] = str(value).strip() if isinstance(value, str) else value
# 处理专属顾问 ID
if pd.notna(df_row.get('userId')):
request_data['exclusiveConsultantId'] = df_row['userId']
# 确保必要字段
if 'idCustomer' in customer_info:
request_data['pkId'] = customer_info['idCustomer']
request_data['idCustomer'] = customer_info['idCustomer']
return request_data
# 执行批量更新
updated_count = 0
for customer in all_customers:
phone = customer.get("cellPhone")
if not phone:
continue
matched_rows = df[df['客户手机号'] == phone]
if matched_rows.empty:
continue
df_row = matched_rows.iloc[0]
cus_id = customer.get("idCustomer")
if not cus_id:
logger.warning(f"客户手机号 {phone} 缺少 idCustomer,跳过")
continue
try:
# 获取完整客户信息
detail_resp = requests.get(
f'https://yunxiu.f6car.cn/member/customer/{cus_id}',
cookies=cookies,
timeout=10
)
detail_resp.raise_for_status()
original_data = detail_resp.json()
# 构造更新数据
final_json_data = convert_to_request_data(original_data, df_row)
# 发送更新
update_resp = requests.post(
'https://yunxiu.f6car.cn/member/customer/modifyCustomer',
cookies=cookies,
json=final_json_data,
timeout=10
)
if update_resp.status_code == 200 and update_resp.json().get('success'):
logger.info(f"✅ 客户 {phone} 更新成功")
updated_count += 1
else:
logger.error(f"❌ 客户 {phone} 更新失败: {update_resp.text}")
time.sleep(1) # 避免触发限流
except Exception as e:
logger.exception(f"处理客户 {phone} 时发生异常: {e}")
# 更新简道云状态
msg = update_jiandaoyun(data, f'批量修改完成,共更新 {updated_count} 个客户')
if msg.get('msg'):
approve_workflow(data)
logger.info('✅ 表单已自动提交至下一步')
except Exception as e:
logger.exception("modify_customer_info_background 执行出错:")
# 即使出错也尝试通知简道云
try:
update_jiandaoyun(data, f'执行失败: {str(e)[:200]}')
except:
pass
finally:
# 清理临时文件
try:
if os.path.exists(save_path):
os.remove(save_path)
logger.info(f"临时文件已删除: {save_path}")
except Exception as e:
logger.warning(f"删除临时文件失败: {e}")