diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..191c0f5 --- /dev/null +++ b/app/api/__init__.py @@ -0,0 +1,840 @@ +""" +简道云 API 接口封装模块 + +本模块封装了简道云的所有 API 接口,包括: +- 表单数据操作(获取、创建、更新、删除) +- 表单字段获取 +- 工作流操作(获取实例、审批、转交) +- 文件操作(获取上传凭证、上传文件) + +特性: +- 支持失败重试机制(默认最多重试20次) +- 支持字段替换(将字段ID替换为标签名) +- 支持批量操作(批量创建、更新、删除) +- 支持数据类型转换(NumPy、Decimal等) +- 完善的错误处理和日志记录 +""" +import requests +import json +import time +import logging +from typing import Optional, List, Dict, Any +from decimal import Decimal +import numpy as np +from app.config import Config + +# 获取日志记录器 +logger = logging.getLogger('app') +error_logger = logging.getLogger('app.error') # 错误日志记录器 + + +class NpEncoder(json.JSONEncoder): + """ + NumPy 数据类型 JSON 编码器 + + 用于将 NumPy 数据类型转换为 JSON 可序列化的类型。 + 支持: + - np.integer -> int + - np.floating -> float + - np.ndarray -> list + """ + def default(self, obj): + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + else: + return super(NpEncoder, self).default(obj) + + +def replace_decimals(obj): + """ + 递归替换 Decimal 类型为 float + + 递归遍历数据结构,将所有 Decimal 类型替换为 float 类型, + 用于 JSON 序列化。 + + Args: + obj: 要处理的对象(可以是 dict、list 或其他类型) + + Returns: + 处理后的对象,所有 Decimal 类型已替换为 float + """ + if isinstance(obj, dict): + return {k: replace_decimals(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [replace_decimals(item) for item in obj] + elif isinstance(obj, Decimal): + return float(obj) + return obj + + +class API: + """ + 简道云 API 接口封装类 + + 提供简道云所有 API 接口的封装,包括表单操作、工作流操作、文件操作等。 + 所有方法都支持失败重试机制,提高可靠性。 + """ + + def entry_data_get(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: + """ + 获取单条表单数据 + + 根据应用ID、表单ID和数据ID获取单条表单数据。 + + Args: + data: 包含应用ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 + replace: 是否替换字段(将字段ID替换为标签名),默认为False + max_retries: 最大重试次数,默认为20次 + + Returns: + Dict: 表单数据字典 + + Raises: + Exception: 如果重试max_retries次后仍然失败 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/get' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "data_id": data['data_id'] + }) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + print(data_get) + + if replace: + data_get = self.field_replacement(data, data_get) + + return data_get + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + if retries <= max_retries: + time.sleep(0.1) + + if retries > max_retries: + error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。") + raise Exception(f"获取单条表单数据失败,已重试{max_retries}次") + + def entry_data_list(self, data: dict, replace: bool = True, max_retries: int = 20) -> Dict: + """ + 获取多条表单数据 + :param max_retries: 最大重试次数 + :param replace: 是否替换字段,默认为True(保持向后兼容) + :param data: 简道云插件发送过来的data,包含应用id、表单id等信息 + :return: 表单数据列表 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + all_data_batches = [] + last_data_id = None + exit_flag = False + + while True: + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "limit": 100, + "data_id": last_data_id + }) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + + if data_get.get("data"): + all_data_batches.extend(data_get['data']) + last_data_id = data_get['data'][-1].get('_id') + print(f"已获取 {len(all_data_batches)} 条数据") + break + else: + if 'data' not in data_get or len(data_get['data']) == 0: + exit_flag = True + break + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(0.1) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) + + if retries > max_retries: + error_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。") + all_data_batches.append(None) + + if exit_flag: + break + + final_data = { + 'data': all_data_batches + } + + logger.info(f"获取了{len(all_data_batches)}条数据") + + if replace: + print("进行了替换") + return self.field_replacement(data, final_data) + else: + return final_data + + @staticmethod + def entry_widget_list(data: dict, max_retries: int = 20) -> Optional[Dict[str, Any]]: + """ + 获取表单字段 + :param max_retries: 最大重试次数 + :param data: 简道云插件发送过来的data,包含应用id、表单id等信息 + :return: 表单字段信息 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/widget/list' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + }) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + return res.json() + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + if retries <= max_retries: + time.sleep(0.1) + + if retries > max_retries: + error_logger.error(f"获取表单字段失败,已重试{max_retries}次") + return None + + def field_replacement(self, data: dict, data_get: dict) -> dict: + """ + 字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字 + :param data: 简道云插件发送过来的data,包含表单id、数据id、应用id + :param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据 + :return: 替换后的数据 + """ + widget_list = self.entry_widget_list(data) + + if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list): + raise ValueError("映射表没有接受到数据") + + name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']} + + def replace_keys(obj): + """递归替换字典中的键名""" + if isinstance(obj, dict): + new_dict = {} + for key, value in obj.items(): + new_key = name_to_label.get(key, key) + new_dict[new_key] = replace_keys(value) + return new_dict + elif isinstance(obj, list): + return [replace_keys(item) for item in obj] + else: + return obj + + data_get_copy = json.loads(json.dumps(data_get)) + + if 'data' in data_get_copy: + data_get_copy['data'] = replace_keys(data_get_copy['data']) + + return data_get_copy + + @staticmethod + def data_batch_create(data: dict, max_retries: int = 20) -> Optional[Dict]: + """ + 新建单条表单数据 + :param max_retries: 最大重试次数 + :param data: 应该包含应用id、表单id,以及新建的数据data['data'] + :return: 返回创建后简道云返回的信息 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "data": data['data'], + "is_start_workflow": data.get('is_start_workflow', "false"), + "is_start_trigger": data.get('is_start_trigger', "false"), + "transaction_id": data.get('transaction_id', "") + }) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + if res.status_code == 200: + return data_get + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(3) + + if retries > max_retries: + error_logger.error(f"任务 {data.get('data')} 连续{max_retries}次请求失败,放弃此次请求。") + return None + + @staticmethod + def entry_data_batch_create( + data: dict, + chunk_size: int = 90, + max_retries: int = 20 + ) -> List[Optional[Dict]]: + """ + 新建多条数据 + :param max_retries: 最大重试次数 + :param data: 应包含数据id、表单id、以及需要新建的信息,新建信息应该是一个列表 + :param chunk_size: 简道云限制批量新建一次最多100条,这里默认值设置为90条一次 + :return: 返回请求后的结果 + """ + data = replace_decimals(data) + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + total_length = len(data['data_list']) + logger.info(f"多数据写入行数: {total_length}") + + num_chunks = (total_length + chunk_size - 1) // chunk_size + data_get_list = [] + + for i in range(num_chunks): + start_index = i * chunk_size + end_index = min(start_index + chunk_size, total_length) + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "data_list": data['data_list'][start_index:end_index], + "is_start_workflow": data.get('is_start_workflow', "false"), + "is_start_trigger": data.get('is_start_trigger', "false"), + }, cls=NpEncoder) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + if data_get.get("status") == "success": + data_get_list.append(data_get) + break + else: + logger.warning(f"请求异常,将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) + + if retries > max_retries: + error_logger.error( + f"任务 {data['data_list'][start_index:end_index]} 连续{max_retries}次请求失败,放弃此次请求。") + data_get_list.append(None) + + return data_get_list + + @staticmethod + def entry_data_update(data: dict, max_retries: int = 20) -> dict: + """ + 修改数据 + :param max_retries: 最大重试次数 + :param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息 + :return: 修改数据后简道云返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "data_id": data['data_id'], + "data": data['data'] + }) + + data_get = None + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + if res.status_code == 200: + break + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(10) + + if retries > max_retries: + error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。") + + return data_get + + @staticmethod + def entry_data_banch_update(data: dict, max_retries: int = 20, chunk_size: int = 90) -> List[dict]: + """ + 批量修改数据 + :param chunk_size: 批量修改块大小 + :param max_retries: 最大重试次数 + :param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息 + :return: 修改数据后简道云返回的结果列表 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_update' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + total_length = len(data['data_ids']) + logger.info(f"多数据修改行数: {total_length}") + + num_chunks = (total_length + chunk_size - 1) // chunk_size + data_get_list = [] + + for i in range(num_chunks): + start_index = i * chunk_size + end_index = min(start_index + chunk_size, total_length) + payload = { + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "data_ids": data['data_ids'][start_index:end_index], + "data": data['data'] + } + + if "transaction_id" in data: + payload["transaction_id"] = data["transaction_id"] + payload = json.dumps(payload, cls=NpEncoder) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + if res.status_code == 200: + data_get_list.append(data_get) + break + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(10) + + if retries > max_retries: + error_logger.error(f"任务 {data.get('data_ids')} 连续{max_retries}次请求失败,放弃此次请求。") + continue + + return data_get_list + + @staticmethod + def entry_data_delete(data: dict, max_retries: int = 20) -> dict: + """ + 删除单条数据 + :param data: 应包含应用ID、表单ID、数据ID + :param max_retries: 最大重试次数,默认20 + :return: 删除结果 + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/delete' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "data_id": data['data_id'], + }) + + retries = 0 + delete_status = None + + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + delete_status = res.json() + + # 手动处理状态码 4001(数据不存在) + if delete_status == { + "code": 4001, + "msg": "Data does not exist." + }: + logger.info(f"返回结果: {delete_status}") + break + + res.raise_for_status() + + if res.status_code == 200: + break + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(10) + + if retries > max_retries: + error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。") + continue + + return delete_status + + @staticmethod + def entry_data_batch_delete( + data: dict, + chunk_size: int = 90, + max_retries: int = 20 + ) -> List[Optional[Dict]]: + """ + 批量删除数据 + :param data: 应包含应用ID、表单ID、数据ID列表 + :param chunk_size: 单次删除最大条数,默认90 + :param max_retries: 重试次数,默认20 + :return: 删除结果列表 + """ + data = replace_decimals(data) + url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_delete' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + total_length = len(data['data_ids']) + logger.info(f"多数据删除行数: {total_length}") + + num_chunks = (total_length + chunk_size - 1) // chunk_size + data_get_list = [] + + for i in range(num_chunks): + start_index = i * chunk_size + end_index = min(start_index + chunk_size, total_length) + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "data_ids": data['data_ids'][start_index:end_index], + }, cls=NpEncoder) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + logger.info(f"{i}页 返回结果: {data_get}") + if data_get.get("status") == "success": + data_get_list.append(data_get) + break + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) + + if retries > max_retries: + error_logger.error( + f"批量删除任务第{i+1}批 连续{max_retries}次请求失败,放弃此次请求。") + data_get_list.append(None) + + return data_get_list + + @staticmethod + def workflow_instance_get(data: dict, max_retries: int = 20) -> dict: + """ + 查询实例流程信息 + :param max_retries: 最大重试次数 + :param data: 简道云插件发送过来的data,包含应用id + :return: 查询简道云流程实例信息返回的结果 + """ + url = 'https://api.jiandaoyun.com/api/v5/workflow/instance/get' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "instance_id": data['data_id'], + "tasks_type": 1 + }) + + data_get = None + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + data_get = res.json() + if res.status_code == 200: + break + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(0.1) + + if retries > max_retries: + error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。") + + return data_get + + @staticmethod + def workflow_task_approve(data: dict, max_retries: int = 20) -> dict: + """ + 流程待办提交 + :param max_retries: 最大重试次数 + :param data: 应包含username、instance_id(data_id)、task_id等信息 + :return: 返回简道云流程待办提交的结果 + """ + url = 'https://api.jiandaoyun.com/api/v1/workflow/task/approve' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "username": data["username"], + "instance_id": data["instance_id"], + "task_id": data['task_id'], + "comment": data.get("comment", "自动转交") + }) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + if res.status_code == 200: + return res.json() + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(3) + + if retries > max_retries: + error_logger.error(f"任务 {data.get('task_id')} 连续{max_retries}次请求失败,放弃此次请求。") + return {} + + @staticmethod + def workflow_task_hand_over(data: dict, max_retries: int = 10) -> Optional[dict]: + """ + 流程待办转交 + :param max_retries: 最大重试次数 + :param data: 应包含username、instance_id(data_id)、task_id、transfer_username等信息 + :return: 返回简道云流程待办转交的结果 + """ + url = 'https://api.jiandaoyun.com/api/v1/workflow/task/transfer' + + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "username": data["username"], + "instance_id": data["instance_id"], + "task_id": data['task_id'], + "transfer_username": data['transfer_username'], + "comment": data.get("comment", "转交") + }) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + if res.status_code == 200: + return res.json() + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(3) + + if retries > max_retries: + error_logger.error(f"任务转交失败,已重试{max_retries}次") + return None + + @staticmethod + def get_upload_token(data: dict, max_retries: int = 10) -> Optional[Dict[str, Any]]: + """ + 获取文件上传凭证 + :param max_retries: 最大重试次数 + :param data: 应包含应用ID、表单ID、事务ID + :return: 返回upload_url、upload_token + """ + url = 'https://api.jiandaoyun.com/api/v5/app/entry/file/get_upload_token' + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + 'Content-Type': 'application/json' + } + + payload = json.dumps({ + "app_id": data['api_key'], + "entry_id": data['entry_id'], + "transaction_id": data['transaction_id'], + }) + + retries = 0 + while retries <= max_retries: + try: + res = requests.post(url=url, data=payload, headers=headers, timeout=10) + res.raise_for_status() + res_j = res.json() + + # 检查 token_and_url_list 是否存在且不为空 + token_list = res_j.get('token_and_url_list', []) + if not token_list or len(token_list) == 0: + logger.warning(f"未获取到上传凭证列表,将重新请求") + retries += 1 + time.sleep(3) + continue + + token_item = token_list[0] + upload_url = token_item.get('url') + upload_token = token_item.get('token') + + if not upload_url or not upload_token: + logger.warning(f"上传凭证信息不完整,将重新请求") + retries += 1 + time.sleep(3) + continue + + logger.info(f"返回结果: {upload_url}, {upload_token}") + if res.status_code == 200: + return { + 'upload_url': upload_url, + 'upload_token': upload_token + } + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except (requests.exceptions.RequestException, KeyError, IndexError, TypeError) as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(3) + + if retries > max_retries: + error_logger.error(f"获取上传凭证失败,已重试{max_retries}次") + return None + + @staticmethod + def upload_file(data: dict, max_retries: int = 10) -> Optional[Any]: + """ + 上传文件 + :param max_retries: 最大重试次数 + :param data: 应包含上传文件路径、上传文件url、上传文件token + :return: 返回上传文件结果 + """ + url = data['upload_url'] + headers = { + 'Authorization': Config.JIANDAOYUN_API_TOKEN, + } + file_path = data['file_path'] + payload = { + "token": data['upload_token'], + } + + retries = 0 + result = None + + while retries <= max_retries: + try: + with open(file_path, 'rb') as f: + files = {"file": f} + res = requests.post(url=url, data=payload, headers=headers, files=files, timeout=10) + res.raise_for_status() + data_get = res.json() + logger.info(f"返回结果: {data_get}") + if res.status_code == 200: + result = data_get + break + else: + logger.warning(f"请求异常, 将重新请求") + retries += 1 + time.sleep(3) + except requests.exceptions.RequestException as e: + logger.warning(f"请求异常: {e}, 将重新请求") + retries += 1 + time.sleep(3) + + if retries > max_retries: + error_logger.error(f"上传文件失败,已重试{max_retries}次") + + return result diff --git a/doc/添加新任务类指南.md b/doc/添加新任务类指南.md new file mode 100644 index 0000000..f8c8a78 --- /dev/null +++ b/doc/添加新任务类指南.md @@ -0,0 +1,452 @@ +# 添加新任务类指南 + +本文档详细说明如何在项目中添加一个新的任务类,包含立即响应和后台执行功能。 + +## 目录 + +1. [架构概述](#架构概述) +2. [添加步骤](#添加步骤) +3. [代码示例](#代码示例) +4. [文件结构说明](#文件结构说明) +5. [注意事项](#注意事项) + +--- + +## 架构概述 + +项目采用**立即响应 + 后台执行**的架构模式: + +- **立即响应函数**:位于 `app/module/f6_plugin_handlers.py`,负责快速响应请求,立即返回"正在执行"消息 +- **后台执行函数**:位于 `app/tasks/` 目录下,负责在后台线程中执行实际任务 + +### 工作流程 + +``` +客户端请求 + ↓ +立即响应函数(F6PluginHandlers) + ↓ +启动后台线程 + ↓ +后台执行函数(tasks/*.py) + ↓ +更新简道云表单 + ↓ +自动提交工作流 +``` + +--- + +## 添加步骤 + +### 步骤 1: 创建后台任务文件 + +在 `app/tasks/` 目录下创建新的任务文件,例如 `app/tasks/your_task.py`: + +```python +""" +你的任务相关后台任务模块 + +本模块包含你的任务相关的后台任务,包括: +- 任务描述1 +- 任务描述2 + +这些任务在后台线程中执行,不会阻塞主请求。 +执行完成后会更新简道云表单并自动提交工作流。 +""" +import logging +import os +import requests +import pandas as pd +from typing import Dict, Any +from tqdm import tqdm +from app.tasks.common import update_jiandaoyun, approve_workflow + +logger = logging.getLogger('app') + + +def your_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None, + df: pd.DataFrame = None, save_path: str = None): + """ + 你的任务后台执行函数 + + 在后台线程中执行你的任务。 + 执行完成后会更新简道云表单并自动提交工作流。 + + Args: + data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 + cookies: 用户登录 F6 系统的 cookies 信息(可选) + df: Excel 文件读取的内容,DataFrame 格式(可选) + save_path: Excel 文件保存的地址,执行完成后会删除此文件(可选) + + Returns: + None + + 注意: + - 执行完成后会自动删除上传的文件(如果提供了save_path) + - 执行结果会更新到简道云表单 + """ + try: + # TODO: 在这里实现具体的任务逻辑 + results = [] + + # 示例:处理数据 + if df is not None: + df = df.where(pd.notnull(df), None) + for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"): + # 实现具体的数据处理逻辑 + result_item = { + '行号': index + 1, + '状态': '处理成功' + } + results.append(result_item) + else: + # 如果没有DataFrame,执行其他任务 + results.append({'状态': '任务执行成功'}) + + # 删除文件(如果提供了save_path) + if save_path and os.path.exists(save_path): + os.remove(save_path) + logger.info(f'{save_path}已删除') + + # 格式化结果 + results_str = f'{results}' if results else '任务执行完成' + logger.info(f"任务执行结果: {results_str}") + + # 调用api回写改掉 执行明细与执行状态 + msg = update_jiandaoyun(data, results_str) + + if msg.get('msg'): + approve_workflow(data) + logger.info('表单已自动提交至下一步') + + except Exception as e: + error_msg = f'任务执行失败: {str(e)}' + logger.error(error_msg, exc_info=True) + msg = update_jiandaoyun(data, error_msg) + if msg.get('msg'): + approve_workflow(data) +``` + +### 步骤 2: 在任务导出文件中注册 + +在 `app/tasks/__init__.py` 中添加导入和导出: + +```python +# 你的任务 +from app.tasks.your_task import your_task_background + +__all__ = [ + # ... 其他任务 + # 你的任务 + 'your_task_background', +] +``` + +### 步骤 3: 添加立即响应函数 + +在 `app/module/f6_plugin_handlers.py` 中添加立即响应函数: + +```python +from app.tasks.your_task import your_task_background + +class F6PluginHandlers: + # ... 其他方法 + + @staticmethod + def your_task(data: Dict[str, Any]) -> Dict[str, str]: + """ + 你的任务 + + 从简道云获取任务请求,读取 Excel 文件(如果需要),并在后台线程中执行任务。 + 立即返回"正在执行"的提示,实际执行在后台线程中完成。 + + Args: + data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 + + Returns: + Dict[str, str]: 包含执行状态的字典 + """ + entry_data = api_instance.entry_data_get(data=data) + print('执行 你的任务') + + # 获取必要的参数(根据实际需求调整) + username = entry_data['data'].get('账号') + password = entry_data['data'].get('密码') + company_name = entry_data['data'].get('公司名称') + save_path = entry_data['data'].get('文件保存地址') + + # 如果需要登录F6系统 + cookies = None + if username and password and company_name: + login_response = F6Module.login_in(username, password, company_name) + if login_response is None: + return {'msg': '登录失败', 'msg_details': '无法登录F6系统'} + cookies = requests.utils.dict_from_cookiejar(login_response.cookies) + + # 如果需要读取Excel文件 + df = None + if save_path: + try: + df = pd.read_excel(save_path, sheet_name=0, dtype='string') + except Exception as e: + return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'} + + # 启动后台线程执行任务 + try: + thread = threading.Thread(target=your_task_background, + args=(data, cookies, df, save_path)) + thread.start() + except Exception as e: + print(f'创建线程失败: {str(e)}') + return {'msg': '任务启动失败', 'msg_details': f'无法启动后台任务: {str(e)}'} + + return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} +``` + +### 步骤 4: 注册操作到模块注册表 + +在 `main.py` 的 `lifespan` 函数中注册操作: + +```python +core_manager.register_action('your_task', f6_plugin_handlers.your_task, 'f6_plugin_module', + description='你的任务描述') +``` + +--- + +## 代码示例 + +### 完整示例:BI任务 + +以下是一个完整的示例,展示如何添加BI任务类: + +#### 1. 后台任务文件 (`app/tasks/bi_tasks.py`) + +```python +""" +BI相关后台任务模块 +""" +import logging +import os +import pandas as pd +from typing import Dict, Any +from tqdm import tqdm +from app.tasks.common import update_jiandaoyun, approve_workflow + +logger = logging.getLogger('app') + +def bi_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None, + df: pd.DataFrame = None, save_path: str = None): + """BI任务后台执行函数""" + try: + results = [] + if df is not None: + # 处理Excel数据 + for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理BI数据"): + results.append({'行号': index + 1, '状态': '处理成功'}) + else: + results.append({'状态': 'BI任务执行成功'}) + + if save_path and os.path.exists(save_path): + os.remove(save_path) + + msg = update_jiandaoyun(data, f'{results}') + if msg.get('msg'): + approve_workflow(data) + except Exception as e: + error_msg = f'BI任务执行失败: {str(e)}' + logger.error(error_msg, exc_info=True) + update_jiandaoyun(data, error_msg) +``` + +#### 2. 立即响应函数 (`app/module/f6_plugin_handlers.py`) + +```python +@staticmethod +def bi_task(data: Dict[str, Any]) -> Dict[str, str]: + """BI任务立即响应函数""" + entry_data = api_instance.entry_data_get(data=data) + + # 获取参数 + username = entry_data['data'].get('账号') + password = entry_data['data'].get('密码') + company_name = entry_data['data'].get('公司名称') + save_path = entry_data['data'].get('文件保存地址') + + # 登录(如果需要) + cookies = None + if username and password and company_name: + login_response = F6Module.login_in(username, password, company_name) + if login_response is None: + return {'msg': '登录失败'} + cookies = requests.utils.dict_from_cookiejar(login_response.cookies) + + # 读取文件(如果需要) + df = None + if save_path: + df = pd.read_excel(save_path, sheet_name=0, dtype='string') + + # 启动后台线程 + thread = threading.Thread(target=bi_task_background, + args=(data, cookies, df, save_path)) + thread.start() + + return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} +``` + +#### 3. 注册操作 (`main.py`) + +```python +core_manager.register_action('bi_task', f6_plugin_handlers.bi_task, 'f6_plugin_module', + description='BI任务') +``` + +--- + +## 文件结构说明 + +### 关键文件位置 + +``` +fastapi_app/ +├── app/ +│ ├── module/ +│ │ └── f6_plugin_handlers.py # 立即响应处理器(重命名自 f6_plugin_module.py) +│ │ +│ ├── tasks/ +│ │ ├── __init__.py # 任务导出文件 +│ │ ├── common.py # 通用任务函数 +│ │ ├── bi_tasks.py # BI任务(示例) +│ │ ├── brand_tasks.py # 品牌任务 +│ │ ├── customer_tasks.py # 客户任务 +│ │ └── delete_tasks.py # 删除任务 +│ │ +│ └── api/ +│ └── routes.py # API路由 +│ +└── main.py # 应用入口,注册所有操作 +``` + +### 命名规范 + +- **立即响应函数**:位于 `F6PluginHandlers` 类中,使用小写下划线命名(如 `bi_task`) +- **后台执行函数**:位于 `app/tasks/` 目录,使用 `{task_name}_background` 命名(如 `bi_task_background`) +- **操作名称**:在 `main.py` 中注册时使用,通常与立即响应函数名相同(如 `'bi_task'`) + +--- + +## 注意事项 + +### 1. 文件命名变更 + +**重要**:`f6_plugin_module.py` 已重命名为 `f6_plugin_handlers.py`,类名从 `F6PluginModule` 改为 `F6PluginHandlers`。 + +- 文件名更清晰地表达了其功能:处理立即响应的处理器 +- 所有引用已更新,但 `app.state.f6_plugin_module` 保持向后兼容 + +### 2. 参数传递 + +后台执行函数通常接收以下参数: + +- `data`: 必需,包含简道云表单信息 +- `cookies`: 可选,F6系统登录凭证 +- `df`: 可选,Excel文件数据(DataFrame) +- `save_path`: 可选,文件保存路径 + +### 3. 错误处理 + +- 立即响应函数:应捕获登录、文件读取等错误,立即返回错误信息 +- 后台执行函数:应使用 try-except 包裹整个逻辑,确保错误能更新到简道云表单 + +### 4. 文件清理 + +如果任务处理了上传的文件,应在执行完成后删除: + +```python +if save_path and os.path.exists(save_path): + os.remove(save_path) + logger.info(f'{save_path}已删除') +``` + +### 5. 简道云表单更新 + +所有后台任务完成后都应: + +1. 调用 `update_jiandaoyun(data, results_str)` 更新执行结果 +2. 如果更新成功,调用 `approve_workflow(data)` 自动提交工作流 + +### 6. 日志记录 + +使用项目统一的日志记录器: + +```python +import logging +logger = logging.getLogger('app') +logger.info("信息日志") +logger.error("错误日志", exc_info=True) # exc_info=True 记录异常堆栈 +``` + +### 7. 进度显示 + +对于批量处理任务,使用 `tqdm` 显示进度: + +```python +from tqdm import tqdm + +for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"): + # 处理逻辑 +``` + +--- + +## 快速检查清单 + +添加新任务类时,请确认: + +- [ ] 创建了后台任务文件 `app/tasks/{task_name}_tasks.py` +- [ ] 在 `app/tasks/__init__.py` 中导出了后台任务函数 +- [ ] 在 `app/module/f6_plugin_handlers.py` 中添加了立即响应函数 +- [ ] 在 `main.py` 中注册了操作 +- [ ] 实现了错误处理逻辑 +- [ ] 添加了日志记录 +- [ ] 实现了文件清理(如果处理了文件) +- [ ] 实现了简道云表单更新和工作流提交 + +--- + +## 常见问题 + +### Q: 如何测试新添加的任务? + +A: 启动应用后,通过API调用测试: +- 请求头设置 `Action: your_task` +- 请求体包含简道云表单数据 + +### Q: 任务执行失败怎么办? + +A: 后台执行函数中的异常会被捕获,错误信息会更新到简道云表单的"执行明细"字段。 + +### Q: 如何修改任务执行逻辑? + +A: 只需修改 `app/tasks/{task_name}_tasks.py` 中的后台执行函数即可。 + +### Q: 可以添加不需要登录的任务吗? + +A: 可以,在立即响应函数中不调用 `F6Module.login_in`,`cookies` 参数传 `None` 即可。 + +--- + +## 相关文件 + +- `app/module/f6_plugin_handlers.py` - 立即响应处理器 +- `app/tasks/common.py` - 通用任务函数(update_jiandaoyun, approve_workflow) +- `app/core/module_registry.py` - 模块注册表 +- `main.py` - 应用入口和操作注册 + +--- + +**最后更新**: 2025年 + +**维护者**: 数据组 + diff --git a/doc/系统说明书.md b/doc/系统说明书.md new file mode 100644 index 0000000..1361008 --- /dev/null +++ b/doc/系统说明书.md @@ -0,0 +1,685 @@ +# 简道云 FastAPI 服务系统说明书 + +## 1. 系统概述 + +### 1.1 系统简介 + +简道云 FastAPI 服务是一个基于 FastAPI 框架开发的后端服务系统,主要用于处理简道云插件发送的业务请求,与 F6 汽车维修管理系统进行数据交互,实现数据同步、批量处理、信息管理等核心功能。 + +### 1.2 系统定位 + +- **服务对象**: 简道云插件前端 +- **集成系统**: F6 汽车维修管理系统、简道云平台 +- **主要功能**: 数据同步、批量操作、信息管理、工作流自动化 + +### 1.3 系统特点 + +- **高性能**: 基于 FastAPI 异步框架,支持高并发请求 +- **模块化**: 采用模块化设计,易于扩展和维护 +- **可靠性**: 完善的异常处理和重试机制 +- **自动化**: 支持后台任务自动执行和工作流自动提交 +- **可维护性**: 清晰的代码结构和完善的日志记录 + +## 2. 系统功能 + +### 2.1 F6系统集成功能 + +#### 2.1.1 登录认证 + +**功能描述**: 实现 F6 系统的登录认证,支持用户名密码登录和验证码识别。 + +**主要流程**: +1. 接收用户名、密码、公司名称 +2. 对密码进行 MD5 加密 +3. 发送登录请求到 F6 系统 +4. 如果触发验证码,自动识别并重试 +5. 选择指定公司并完成登录 +6. 返回登录结果和 Cookie 信息 + +**技术实现**: +- 使用 `requests` 库发送 HTTP 请求 +- 使用 `pytesseract` 进行验证码 OCR 识别 +- 使用 `PIL` 进行图像预处理(对比度、亮度调整) + +#### 2.1.2 公司信息获取 + +**功能描述**: 获取 F6 系统中用户可访问的公司列表,并保存到简道云表单。 + +**主要流程**: +1. 使用用户名密码登录 F6 系统 +2. 获取公司列表(单店或多店) +3. 将公司信息批量写入简道云表单 +4. 返回时间戳用于后续查询 + +**输出格式**: +- 单店: 返回门店名称 +- 多店: 返回所有公司名称列表 + +#### 2.1.3 门店信息获取 + +**功能描述**: 获取指定公司下的门店列表及统计数据。 + +**主要流程**: +1. 登录 F6 系统并选择公司 +2. 获取门店列表 +3. 统计客户车辆数量和客户数量 +4. 将门店信息批量写入简道云表单 +5. 返回时间戳和统计数据 + +**输出内容**: +- 门店名称列表 +- 客户车辆总数 +- 客户总数 + +#### 2.1.4 保持连接 + +**功能描述**: 心跳检测功能,用于保持连接活跃。 + +**实现**: 直接返回接收到的数据,不做任何处理。 + +### 2.2 文件处理功能 + +#### 2.2.1 文件上传和下载 + +**功能描述**: 处理简道云插件上传的文件,下载并保存到本地。 + +**主要流程**: +1. 从简道云获取文件 URL +2. 解析文件名和扩展名 +3. 根据时间戳生成唯一文件名 +4. 下载文件到 `下载文件/` 目录 +5. 返回文件保存路径 + +**文件命名规则**: `原文件名_时间戳.扩展名` + +#### 2.2.2 文件校验 + +**功能描述**: 校验上传的 Excel 文件格式是否符合要求。 + +**支持的操作类型**: +- `create_brand`: 校验第一列是否为"品牌" +- `modify_customer_info`: 校验第一列是否为"客户手机号" +- `delete_cars`: 暂不校验 + +**校验流程**: +1. 读取 Excel 文件第一列 +2. 检查表头是否符合要求 +3. 返回校验结果(成功/失败) + +### 2.3 数据管理功能 + +#### 2.3.1 品牌批量创建 + +**功能描述**: 从 Excel 文件读取品牌名称,批量创建到 F6 系统。 + +**主要流程**: +1. 从简道云获取任务数据(账号、密码、公司名称、文件路径) +2. 登录 F6 系统 +3. 读取 Excel 文件(第一列为品牌名称) +4. 在后台线程中批量创建品牌 +5. 立即返回"正在执行"提示 +6. 后台任务完成后更新简道云表单并自动提交工作流 + +**后台任务处理**: +- 遍历 Excel 文件中的每一行 +- 调用 F6 API 创建品牌 +- 记录成功和失败的结果 +- 执行完成后删除上传的文件 +- 更新简道云表单的执行明细 +- 自动提交工作流到下一步 + +#### 2.3.2 客户信息修改 + +**功能描述**: 从 Excel 文件读取客户修改信息,批量修改 F6 系统中的客户信息。 + +**主要流程**: +1. 从简道云获取任务数据 +2. 登录 F6 系统 +3. 读取 Excel 文件(包含客户手机号、修改字段等) +4. 获取所有客户列表 +5. 获取员工列表(用于匹配专属运营顾问) +6. 在后台线程中批量修改客户信息 +7. 立即返回"正在执行中"提示 + +**Excel 文件格式**: +- 第一列: 客户手机号(必需,用于匹配) +- 其他列: 需要修改的字段(客户姓名、客户类型、客户来源、单位名称、专属运营顾问、客户备注等) + +**匹配逻辑**: +- 根据客户手机号匹配 F6 系统中的客户 +- 获取客户原始信息 +- 合并 Excel 中的修改字段 +- 解析地址信息(省市区) +- 匹配专属运营顾问的用户ID + +#### 2.3.3 客户信息删除 + +**功能描述**: 批量删除 F6 系统中的客户信息。 + +**主要流程**: +1. 从简道云获取任务数据 +2. 登录 F6 系统 +3. 获取所有客户列表 +4. 获取会员卡列表(提取客户ID) +5. 在后台线程中批量删除客户 +6. 立即返回"正在执行中"提示 + +**删除规则**: +- 跳过有最近消费时间的客户 +- 跳过有会员卡的客户 +- 8-20点之间每3.5秒删除一条,其余时间每1.5秒删除一条 +- 记录成功和失败次数 + +#### 2.3.4 车辆信息删除 + +**功能描述**: 批量删除 F6 系统中的客户车辆信息。 + +**主要流程**: +1. 从简道云获取任务数据 +2. 登录 F6 系统 +3. 分页获取所有车辆列表 +4. 获取会员卡列表(提取车辆ID) +5. 在后台线程中批量删除车辆 +6. 立即返回"正在执行中"提示 + +**删除规则**: +- 跳过有最近消费时间的客户车辆 +- 跳过有会员卡的车辆 +- 8-20点之间每3秒删除一条,其余时间每1秒删除一条 +- 记录成功和失败次数 + +#### 2.3.5 历史记录删除 + +**功能描述**: 删除指定门店的历史维修记录。 + +**主要流程**: +1. 从简道云获取任务数据(账号、密码、公司名称、门店名称) +2. 登录 F6 系统 +3. 获取门店列表并匹配门店ID +4. 在后台线程中删除历史维修记录 +5. 立即返回"正在执行中"提示 + +**删除方式**: 调用 F6 系统的删除接口,删除整个门店的历史维修记录。 + +### 2.4 BI任务功能 + +**功能描述**: 执行 BI 相关的数据处理和报表生成任务。 + +**当前状态**: 框架已搭建,具体业务逻辑待实现。 + +**预留接口**: +- 支持从 Excel 文件读取数据 +- 支持 F6 系统登录(如需要) +- 支持后台线程执行 +- 支持结果回写到简道云 + +### 2.5 工作流自动化 + +**功能描述**: 自动获取简道云工作流的待处理任务并提交到下一步。 + +**主要流程**: +1. 获取工作流实例信息 +2. 查找状态为待处理(status=0)的任务 +3. 提取任务信息(username、instance_id、task_id) +4. 调用审批接口自动提交 +5. 记录执行结果 + +**应用场景**: 后台任务执行完成后,自动将简道云表单提交到工作流下一步,无需人工干预。 + +## 3. 技术架构 + +### 3.1 系统架构 + +``` +┌─────────────────┐ +│ 简道云插件前端 │ +└────────┬────────┘ + │ HTTP/JSON + ▼ +┌─────────────────┐ +│ FastAPI 服务 │ +│ ┌───────────┐ │ +│ │ 路由层 │ │ +│ └─────┬─────┘ │ +│ │ │ +│ ┌─────▼─────┐ │ +│ │ 业务模块 │ │ +│ │ - F6Module│ │ +│ │ - Plugin │ │ +│ └─────┬─────┘ │ +│ │ │ +│ ┌─────▼─────┐ │ +│ │ 任务队列 │ │ +│ └─────┬─────┘ │ +│ │ │ +│ ┌─────▼─────┐ │ +│ │ 后台任务 │ │ +│ └───────────┘ │ +└────────┬────────┘ + │ + ┌────┴────┐ + ▼ ▼ +┌────────┐ ┌────────┐ +│ F6系统 │ │ 简道云 │ +└────────┘ └────────┘ +``` + +### 3.2 核心组件 + +#### 3.2.1 应用入口 (main.py) + +**职责**: +- 应用初始化和生命周期管理 +- 模块注册和路由配置 +- 异常处理器配置 +- 中间件配置(CORS) + +**关键功能**: +- `lifespan`: 管理应用启动和关闭 +- 全局异常处理 +- 模块实例化并注册到注册表 + +#### 3.2.2 路由层 (app/api/routes.py) + +**职责**: +- 定义 API 端点 +- 请求验证和参数解析 +- 调用业务模块处理请求 +- 返回响应 + +**主要端点**: +- `GET /health`: 健康检查 +- `POST /webhook`: 业务请求处理 + +#### 3.2.3 业务模块层 + +**F6Module** (`app/module/module.py`): +- F6 系统登录和认证 +- 公司信息获取 +- 门店信息获取 +- 保持连接 + +**F6PluginModule** (`app/module/f6_plugin_module.py`): +- 文件上传和校验 +- 品牌批量创建 +- 客户信息管理(修改、删除) +- 车辆信息删除 +- 历史记录删除 +- BI任务 + +**OtherPluginModule** (`app/module/other_module.py`): +- 其他插件功能(如短信签名状态) + +#### 3.2.4 任务处理层 + +**任务队列** (`app/utils/app_tools.py`): +- 使用 `Queue` 实现任务队列 +- 后台线程处理任务 +- 支持同步等待结果 + +**后台任务** (`app/tasks/`): +- `common.py`: 通用任务函数(更新简道云、工作流审批等) +- `brand_tasks.py`: 品牌相关任务 +- `customer_tasks.py`: 客户相关任务 +- `delete_tasks.py`: 删除相关任务 +- `bi_tasks.py`: BI相关任务 + +#### 3.2.5 核心工具层 + +**模块注册表** (`app/core/module_registry.py`): +- 统一管理所有业务操作 +- 支持操作查询和路由 +- 存储操作元数据(描述、模块名等) + +**应用工具** (`app/utils/app_tools.py`): +- 日志记录器配置(支持轮转) +- 任务队列管理 +- 请求头解码工具 +- 后台调度器 + +**简道云API封装** (`app/api/__init__.py`): +- 封装简道云所有API接口 +- 支持失败重试机制 +- 字段ID到标签名的替换 +- 批量操作支持 + +### 3.3 数据流 + +#### 3.3.1 同步任务流程 + +``` +简道云插件 → FastAPI路由 → 业务模块 → 任务队列 → 处理线程 → 返回结果 → 简道云插件 +``` + +#### 3.3.2 后台任务流程 + +``` +简道云插件 → FastAPI路由 → 业务模块 → 启动后台线程 → 立即返回 + ↓ + 后台任务执行 + ↓ + 更新简道云表单 + ↓ + 自动提交工作流 +``` + +### 3.4 技术选型 + +| 技术 | 版本 | 用途 | +|------|------|------| +| FastAPI | 0.121.0 | Web框架 | +| Uvicorn | 0.38.0 | ASGI服务器 | +| Pydantic | - | 数据验证 | +| Requests | 2.32.5 | HTTP请求 | +| Pandas | 2.3.3 | Excel处理 | +| APScheduler | 3.11.1 | 任务调度 | +| Pillow | 12.0.0 | 图像处理 | +| Pytesseract | 0.3.13 | OCR识别 | + +## 4. 部署运维 + +### 4.1 环境要求 + +- **操作系统**: Windows/Linux/macOS +- **Python版本**: 3.8+ +- **内存**: 建议 2GB+ +- **磁盘**: 根据文件存储需求,建议 10GB+ + +### 4.2 安装步骤 + +1. **克隆代码**: + ```bash + git clone + cd fastapi_app + ``` + +2. **安装依赖**: + ```bash + pip install -r requirements.txt + ``` + +3. **配置环境**: + - 编辑 `app/config.py`,配置 API Token + - 确保目录权限正确(logs、下载文件、模板文件) + +4. **启动服务**: + ```bash + python main.py + # 或 + uvicorn main:app --host 0.0.0.0 --port 5003 + ``` + +### 4.3 生产环境部署 + +#### 4.3.1 使用 Gunicorn + Uvicorn Workers + +```bash +pip install gunicorn +gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:5003 +``` + +#### 4.3.2 使用 systemd 管理服务(Linux) + +创建服务文件 `/etc/systemd/system/fastapi-app.service`: + +```ini +[Unit] +Description=简道云 FastAPI 服务 +After=network.target + +[Service] +Type=simple +User=your_user +WorkingDirectory=/path/to/fastapi_app +Environment="PATH=/path/to/venv/bin" +ExecStart=/path/to/venv/bin/uvicorn main:app --host 0.0.0.0 --port 5003 +Restart=always + +[Install] +WantedBy=multi-user.target +``` + +启动服务: +```bash +sudo systemctl enable fastapi-app +sudo systemctl start fastapi-app +``` + +#### 4.3.3 使用 Nginx 反向代理 + +Nginx 配置示例: + +```nginx +server { + listen 80; + server_name your_domain.com; + + location / { + proxy_pass http://127.0.0.1:5003; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } +} +``` + +### 4.4 监控和日志 + +#### 4.4.1 日志管理 + +- **日志位置**: `logs/简道云.log` +- **日志轮转**: 单文件最大 5MB,保留 5 个备份 +- **日志级别**: INFO +- **日志格式**: `时间戳 级别:模块名:消息` + +#### 4.4.2 健康检查 + +定期访问 `/health` 端点检查服务状态: + +```bash +curl http://localhost:5003/health +``` + +#### 4.4.3 性能监控 + +建议使用以下工具监控服务性能: +- **APM工具**: New Relic、Datadog 等 +- **日志分析**: ELK Stack、Loki 等 +- **指标监控**: Prometheus + Grafana + +### 4.5 备份和恢复 + +#### 4.5.1 数据备份 + +需要备份的内容: +- 配置文件 (`app/config.py`) +- 日志文件 (`logs/`) +- 下载的文件 (`下载文件/`) +- 模板文件 (`模板文件/`) + +#### 4.5.2 恢复步骤 + +1. 恢复代码和配置文件 +2. 恢复数据文件 +3. 重新安装依赖 +4. 重启服务 + +## 5. 安全说明 + +### 5.1 认证和授权 + +- **API Token**: 使用简道云 API Token 进行认证 +- **F6系统**: 使用用户名密码登录,密码进行 MD5 加密 + +### 5.2 数据安全 + +- **敏感信息**: API Token 建议使用环境变量管理 +- **密码处理**: 密码在传输前进行 MD5 加密 +- **文件存储**: 上传的文件在处理完成后自动删除 + +### 5.3 网络安全 + +- **CORS配置**: 生产环境建议限制允许的来源 +- **HTTPS**: 建议使用 HTTPS 加密传输 +- **防火墙**: 限制服务端口访问 + +### 5.4 安全建议 + +1. 定期更新依赖包,修复安全漏洞 +2. 使用强密码策略 +3. 定期审查日志,发现异常访问 +4. 限制文件上传大小和类型 +5. 实施请求频率限制 + +## 6. 故障排查 + +### 6.1 常见问题 + +#### 问题1: 服务启动失败 + +**可能原因**: +- 端口被占用 +- 依赖包未安装 +- 配置文件错误 + +**解决方法**: +- 检查端口占用: `netstat -ano | findstr 5003` +- 重新安装依赖: `pip install -r requirements.txt` +- 检查配置文件格式 + +#### 问题2: 登录失败 + +**可能原因**: +- 用户名密码错误 +- 公司名称不正确 +- 验证码识别失败 +- F6系统服务异常 + +**解决方法**: +- 验证用户名密码和公司名称 +- 检查 Tesseract OCR 是否正确安装 +- 查看日志获取详细错误信息 + +#### 问题3: 文件上传失败 + +**可能原因**: +- 文件格式不正确 +- 文件路径不存在 +- 磁盘空间不足 +- 网络连接问题 + +**解决方法**: +- 检查文件格式是否符合要求 +- 确保目录存在且有写权限 +- 检查磁盘空间 +- 查看网络连接状态 + +#### 问题4: 后台任务未执行 + +**可能原因**: +- 线程启动失败 +- 任务执行异常 +- 简道云API调用失败 + +**解决方法**: +- 查看日志中的错误信息 +- 检查简道云API Token是否有效 +- 验证任务数据格式是否正确 + +### 6.2 日志分析 + +查看日志文件: +```bash +tail -f logs/简道云.log +``` + +搜索错误: +```bash +grep -i error logs/简道云.log +``` + +### 6.3 性能优化 + +1. **并发处理**: 调整 Uvicorn workers 数量 +2. **数据库连接池**: 如使用数据库,配置连接池 +3. **缓存**: 对频繁访问的数据使用缓存 +4. **异步处理**: 更多使用异步函数提高性能 + +## 7. 扩展开发 + +### 7.1 添加新功能模块 + +1. 在 `app/module/` 下创建新模块文件 +2. 实现业务逻辑方法 +3. 在 `main.py` 中注册模块和操作 +4. 在 `app/schemas.py` 中添加数据模型(如需要) + +### 7.2 添加新的API端点 + +1. 在 `app/api/routes.py` 中添加路由 +2. 使用依赖注入获取所需服务 +3. 实现业务逻辑 +4. 返回响应 + +### 7.3 添加新的后台任务 + +1. 在 `app/tasks/` 下创建任务文件 +2. 实现后台任务函数 +3. 在业务模块中调用,启动线程 +4. 使用 `update_jiandaoyun` 更新结果 +5. 使用 `approve_workflow` 自动提交工作流 + +## 8. 附录 + +### 8.1 API接口清单 + +| 端点 | 方法 | 描述 | +|------|------|------| +| `/health` | GET | 健康检查 | +| `/webhook` | POST | 业务请求处理 | + +### 8.2 操作类型清单 + +| 操作名 | 模块 | 类型 | +|--------|------|------| +| `login_in` | F6Module | 同步 | +| `get_company_information` | F6Module | 同步 | +| `get_store_information` | F6Module | 同步 | +| `keep_alive` | F6Module | 同步 | +| `check_file` | F6PluginModule | 同步 | +| `create_brand` | F6PluginModule | 后台 | +| `delete_history` | F6PluginModule | 后台 | +| `delete_customer` | F6PluginModule | 后台 | +| `delete_cars` | F6PluginModule | 后台 | +| `modify_customer_info` | F6PluginModule | 后台 | +| `bi_task` | F6PluginModule | 后台 | +| `sms_signature_status` | OtherPluginModule | 同步 | + +### 8.3 配置文件说明 + +**app/config.py**: +- `BASE_DIR`: 项目根目录 +- `SAVE_DIRECTORY`: 下载文件目录 +- `MODE_DIRECTORY`: 模板文件目录 +- `LOGS_DIRECTORY`: 日志目录 +- `LOG_FILE`: 日志文件路径 +- `JIANDAOYUN_API_TOKEN`: 简道云API Token + +### 8.4 目录结构说明 + +- `logs/`: 日志文件存储 +- `下载文件/`: 从简道云下载的文件临时存储 +- `模板文件/`: 模板文件存储 +- `app/`: 应用代码目录 +- `app/api/`: API相关代码 +- `app/core/`: 核心功能代码 +- `app/module/`: 业务模块代码 +- `app/tasks/`: 后台任务代码 +- `app/utils/`: 工具类代码 + +--- + +**文档版本**: 2.0.0 +**最后更新**: 2025年 +**维护团队**: 数据组 +