""" 客户相关后台任务模块 本模块包含客户相关的后台任务,包括: - 客户信息批量修改 这些任务在后台线程中执行,不会阻塞主请求。 """ import logging import requests import pandas as pd import time import os from typing import Dict, Any, Optional from app.tasks.common import update_jiandaoyun, approve_workflow logger = logging.getLogger('app') def modify_customer_info_background(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str): """ 修改客户信息后台任务 在后台线程中批量修改客户信息,从 Excel 文件中读取客户手机号和修改信息。 执行完成后会更新简道云表单并自动提交工作流。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 cookies: 用户登录 F6 系统的 cookies 信息 df: Excel 文件读取的内容,DataFrame 格式,第一列为客户手机号 save_path: Excel 文件保存的地址,执行完成后会删除此文件 Returns: None 注意: - 根据客户手机号匹配客户信息 - 执行完成后会自动删除上传的文件 - 执行结果会更新到简道云表单 """ try: # 替换 NaN 为 None,便于后续判断 df = df.where(pd.notnull(df), None) logger.info("开始获取当前客户下所有客户信息") # 分页获取全部客户 params = {'pageSize': 100, 'pageNo': '1'} res = requests.get( 'https://yunxiu.f6car.cn/member/customer/listForPermission', params=params, cookies=cookies, timeout=10 ) res.raise_for_status() total = int(res.json().get("data", {}).get("total", 0)) total_pages = (total // params["pageSize"]) + (1 if total % params["pageSize"] > 0 else 0) logger.info(f"总计 {total_pages} 页,共 {total} 个客户") all_customers = [] for page in range(1, total_pages + 1): logger.debug(f"正在请求第 {page} 页...") params["pageNo"] = page retry_count = 0 max_retries = 5 while retry_count < max_retries: try: response = requests.get( 'https://yunxiu.f6car.cn/member/customer/listForPermission', params=params, cookies=cookies, timeout=10 ) response.raise_for_status() page_data = response.json().get("data", {}).get("data", []) all_customers.extend(page_data) break except Exception as e: retry_count += 1 logger.warning(f"请求第 {page} 页失败(第 {retry_count} 次重试): {e}") time.sleep(3) else: logger.error(f"第 {page} 页请求失败超过最大重试次数,跳过") # 获取专属运营顾问列表 json_data = { 'includeStopedEmployee': False, 'pageSize': 1000, 'filterNullUser': False, 'keyword': '', 'idOwnOrgList': [], } staff_resp = requests.post( 'https://yunxiu.f6car.cn/hive/employee/searchStaffInGroup', cookies=cookies, json=json_data, timeout=10 ) staff_resp.raise_for_status() staff_list = staff_resp.json().get("data", {}).get("list", []) name_to_userid = { emp['name']: emp['userId'] for emp in staff_list if emp.get('userId') is not None } # 在 df 中添加 userId 列 df['userId'] = df['专属运营顾问'].map(name_to_userid) # 字段映射:Excel 列名 -> F6 字段名 FIELD_MAPPING = { '客户姓名': 'name', '客户类型': 'customerType', '客户来源': 'customerSourceName', '单位名称': 'companyName', '客户备注': 'customerMemo', '专属运营顾问': 'exclusiveConsultantName', # userId 单独处理 } def convert_to_request_data(original_data: dict, df_row: pd.Series) -> dict: """以原始客户数据为基础,仅覆盖 Excel 中非空字段""" customer_info = original_data.get("data", {}).get("customerInfo", {}) or {} request_data = dict(customer_info) # 浅拷贝,足够用 # 覆盖指定字段 for excel_col, f6_field in FIELD_MAPPING.items(): value = df_row.get(excel_col) if pd.notna(value): request_data[f6_field] = str(value).strip() if isinstance(value, str) else value # 处理专属顾问 ID if pd.notna(df_row.get('userId')): request_data['exclusiveConsultantId'] = df_row['userId'] # 确保必要字段 if 'idCustomer' in customer_info: request_data['pkId'] = customer_info['idCustomer'] request_data['idCustomer'] = customer_info['idCustomer'] return request_data # 执行批量更新 updated_count = 0 for customer in all_customers: phone = customer.get("cellPhone") if not phone: continue matched_rows = df[df['客户手机号'] == phone] if matched_rows.empty: continue df_row = matched_rows.iloc[0] cus_id = customer.get("idCustomer") if not cus_id: logger.warning(f"客户手机号 {phone} 缺少 idCustomer,跳过") continue try: # 获取完整客户信息 detail_resp = requests.get( f'https://yunxiu.f6car.cn/member/customer/{cus_id}', cookies=cookies, timeout=10 ) detail_resp.raise_for_status() original_data = detail_resp.json() # 构造更新数据 final_json_data = convert_to_request_data(original_data, df_row) # 发送更新 update_resp = requests.post( 'https://yunxiu.f6car.cn/member/customer/modifyCustomer', cookies=cookies, json=final_json_data, timeout=10 ) if update_resp.status_code == 200 and update_resp.json().get('success'): logger.info(f"✅ 客户 {phone} 更新成功") updated_count += 1 else: logger.error(f"❌ 客户 {phone} 更新失败: {update_resp.text}") time.sleep(1) # 避免触发限流 except Exception as e: logger.exception(f"处理客户 {phone} 时发生异常: {e}") # 更新简道云状态 msg = update_jiandaoyun(data, f'批量修改完成,共更新 {updated_count} 个客户') if msg.get('msg'): approve_workflow(data) logger.info('✅ 表单已自动提交至下一步') except Exception as e: logger.exception("modify_customer_info_background 执行出错:") # 即使出错也尝试通知简道云 try: update_jiandaoyun(data, f'执行失败: {str(e)[:200]}') except: pass finally: # 清理临时文件 try: if os.path.exists(save_path): os.remove(save_path) logger.info(f"临时文件已删除: {save_path}") except Exception as e: logger.warning(f"删除临时文件失败: {e}")