""" 通用后台任务模块 本模块包含所有后台任务通用的功能,包括: - 简道云表单更新 - 工作流审批 - 获取门店ID - 获取会员卡列表 这些功能被多个后台任务模块复用。 """ import logging import time import requests from typing import Dict, Any, List, Optional, Callable from tqdm import tqdm from app.api import API api_instance = API() logger = logging.getLogger('app') def update_jiandaoyun(data: Dict[str, Any], results: str): """ 更新简道云表单 将后台任务的执行结果更新到简道云表单中。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 results: 执行结果信息,将写入到表单的执行明细字段 Returns: Dict: 更新结果字典,{'msg': True} 表示成功,{'msg': False} 表示失败 """ # 定义简道云数据配置 jiandaoyun_data = { 'api_key': data['api_key'], 'entry_id': data['entry_id'], 'data_id': data['data_id'], "data": { '_widget_1731379774828': {"value": "已执行"}, # f6系统批量操作测试 是否执行成功 '_widget_1731381334870': {"value": results} # f6系统批量操作测试 执行明细 } } time.sleep(1) print(jiandaoyun_data) try: response = api_instance.entry_data_update(jiandaoyun_data) logger.info(f"简道云表单更新成功: {response}") return {'msg': True} except Exception as e: logger.error(f"简道云表单更新失败: {e}") return {'msg': False} def approve_workflow(data: Dict[str, Any]): """ 获取简道云当前流程节点并直接提交 获取简道云工作流的当前待处理任务,并自动提交到下一步。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: None 注意: 如果未找到待处理任务,函数会记录错误并返回,不会抛出异常 """ # 获取简道云当前流程列表 json = api_instance.workflow_instance_get(data) # 检查返回数据是否有效 if not json: logger.error("未获取到工作流实例信息") return # 安全地获取任务列表 tasks = json.get('tasks', []) if not tasks: logger.error("未找到待处理任务") return # 将JSON字符串转换为Python字典 username = '' instance_id = '' task_id = '' for task in tasks: if task.get('status') == 0: assignee = task.get('assignee', {}) username = assignee.get('username', '') instance_id = task.get('instance_id', '') task_id = task.get('task_id', '') if username and instance_id and task_id: break if not username or not instance_id or not task_id: logger.error("未找到有效的待处理任务信息") return task_data = { "username": username, "instance_id": instance_id, "task_id": task_id, } try: response = api_instance.workflow_task_approve(task_data) logger.info(f"简道云工作流任务提交成功: {response}") except Exception as e: logger.error(f"简道云工作流任务提交失败: {e}") def execute_failure_handler(data: Dict[str, Any]): """ 简道云失败流程通知 函数执行失败时调用,通过钉钉webhook通知到指定人员 """ now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) pay_load = { "api_key":"6694d3c4fcb69ca9a111a6c4", "entry_id":"6938e011b360a1132522a62a", "data": { "_widget_1765335060501": {"value": now}, # 失败时间 "_widget_1765335060502": {"value": data['failure_name']}, # 任务名称 "_widget_1765335060503": {"value": data['failure_details']} # 失败明细 } } api_instance.data_batch_create(pay_load) def get_operate_org_id(cookies: Dict[str, str]) -> Optional[str]: """ 获取操作门店ID 从F6系统获取第一个门店的组织ID,用于后续操作。 Args: cookies: 用户登录 F6 系统的 cookies 信息 Returns: Optional[str]: 门店ID,如果获取失败返回 None 注意: 如果未获取到门店信息或门店ID为空,会记录错误日志并返回 None """ org_url = "https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=10&name=" try: org_res = requests.get(url=org_url, cookies=cookies) org_data = org_res.json().get("data", {}) org_list = org_data.get("list", []) if not org_list or len(org_list) == 0: logger.error("未获取到门店信息") return None operate_org_id = org_list[0].get("orgId") if not operate_org_id: logger.error("门店ID为空") return None logger.info(f"获取门店ID成功: {operate_org_id}") return operate_org_id except Exception as e: logger.error(f"获取门店ID时发生错误: {e}") return None def get_card_list( cookies: Dict[str, str], operate_org_id: str, extract_func: Callable[[Dict], Optional[str]] = None ) -> List[str]: """ 获取会员卡列表 从F6系统获取指定门店的会员卡列表,支持自定义提取逻辑。 Args: cookies: 用户登录 F6 系统的 cookies 信息 operate_org_id: 门店ID extract_func: 自定义提取函数,用于从会员卡数据中提取ID 如果不提供,默认提取 idCustomer 字段 Returns: List[str]: 会员卡ID列表 注意: - 默认每页100条数据,会自动分页获取所有数据 - 每页请求间隔0.2秒,避免请求过快 """ card_list = [] try: # 获取第一页,确定总页数 card_url = f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}&pageSize=100&pageNo=1" card_res = requests.get(url=card_url, cookies=cookies) total_card = int(card_res.json().get("data", {}).get("total", 0)) if total_card == 0: logger.info("未找到会员卡数据") return card_list total_page = total_card // 100 + (total_card % 100 > 0) logger.info(f"会员卡总数: {total_card}, 总页数: {total_page}") # 定义默认提取函数(提取客户ID) if extract_func is None: def default_extract(card_item: Dict) -> Optional[str]: return card_item.get("idCustomer") extract_func = default_extract # 分页获取所有会员卡数据 for page in tqdm(range(1, total_page + 1), desc="查询会员卡"): card_url = (f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}" f"&pageSize=100&pageNo={page}") card_res = requests.get(url=card_url, cookies=cookies) card_data_list = card_res.json().get("data", {}).get("data", []) # 使用提取函数提取ID for card_item in card_data_list: extracted_id = extract_func(card_item) if extracted_id is not None: card_list.append(extracted_id) time.sleep(0.2) logger.info(f"获取会员卡列表成功,共 {len(card_list)} 条") return card_list except Exception as e: logger.error(f"获取会员卡列表时发生错误: {e}") return card_list