1.客户信息修改,将硬编码改为动态取值

2.新增项目批量停用、材料批量修改功能
This commit is contained in:
2026-01-08 10:19:16 +08:00
parent 3938c820b5
commit 5cde7f852a
4 changed files with 1985 additions and 0 deletions
+8
View File
@@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
+840
View File
@@ -0,0 +1,840 @@
"""
简道云 API 接口封装模块
本模块封装了简道云的所有 API 接口,包括:
- 表单数据操作(获取、创建、更新、删除)
- 表单字段获取
- 工作流操作(获取实例、审批、转交)
- 文件操作(获取上传凭证、上传文件)
特性:
- 支持失败重试机制(默认最多重试20次)
- 支持字段替换(将字段ID替换为标签名)
- 支持批量操作(批量创建、更新、删除)
- 支持数据类型转换(NumPy、Decimal等)
- 完善的错误处理和日志记录
"""
import requests
import json
import time
import logging
from typing import Optional, List, Dict, Any
from decimal import Decimal
import numpy as np
from app.config import Config
# 获取日志记录器
logger = logging.getLogger('app')
error_logger = logging.getLogger('app.error') # 错误日志记录器
class NpEncoder(json.JSONEncoder):
"""
NumPy 数据类型 JSON 编码器
用于将 NumPy 数据类型转换为 JSON 可序列化的类型。
支持:
- np.integer -> int
- np.floating -> float
- np.ndarray -> list
"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
def replace_decimals(obj):
"""
递归替换 Decimal 类型为 float
递归遍历数据结构,将所有 Decimal 类型替换为 float 类型,
用于 JSON 序列化。
Args:
obj: 要处理的对象(可以是 dict、list 或其他类型)
Returns:
处理后的对象,所有 Decimal 类型已替换为 float
"""
if isinstance(obj, dict):
return {k: replace_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_decimals(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj)
return obj
class API:
"""
简道云 API 接口封装类
提供简道云所有 API 接口的封装,包括表单操作、工作流操作、文件操作等。
所有方法都支持失败重试机制,提高可靠性。
"""
def entry_data_get(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict:
"""
获取单条表单数据
根据应用ID、表单ID和数据ID获取单条表单数据。
Args:
data: 包含应用ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
replace: 是否替换字段(将字段ID替换为标签名),默认为False
max_retries: 最大重试次数,默认为20次
Returns:
Dict: 表单数据字典
Raises:
Exception: 如果重试max_retries次后仍然失败
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_id": data['data_id']
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
print(data_get)
if replace:
data_get = self.field_replacement(data, data_get)
return data_get
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
if retries <= max_retries:
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
raise Exception(f"获取单条表单数据失败,已重试{max_retries}")
def entry_data_list(self, data: dict, replace: bool = True, max_retries: int = 20) -> Dict:
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段,默认为True(保持向后兼容)
:param data: 简道云插件发送过来的data,包含应用id、表单id等信息
:return: 表单数据列表
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
all_data_batches = []
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"limit": 100,
"data_id": last_data_id
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if data_get.get("data"):
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(0.1)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。")
all_data_batches.append(None)
if exit_flag:
break
final_data = {
'data': all_data_batches
}
logger.info(f"获取了{len(all_data_batches)}条数据")
if replace:
print("进行了替换")
return self.field_replacement(data, final_data)
else:
return final_data
@staticmethod
def entry_widget_list(data: dict, max_retries: int = 20) -> Optional[Dict[str, Any]]:
"""
获取表单字段
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id、表单id等信息
:return: 表单字段信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/widget/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
return res.json()
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
if retries <= max_retries:
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"获取表单字段失败,已重试{max_retries}")
return None
def field_replacement(self, data: dict, data_get: dict) -> dict:
"""
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
:return: 替换后的数据
"""
widget_list = self.entry_widget_list(data)
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
raise ValueError("映射表没有接受到数据")
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
def replace_keys(obj):
"""递归替换字典中的键名"""
if isinstance(obj, dict):
new_dict = {}
for key, value in obj.items():
new_key = name_to_label.get(key, key)
new_dict[new_key] = replace_keys(value)
return new_dict
elif isinstance(obj, list):
return [replace_keys(item) for item in obj]
else:
return obj
data_get_copy = json.loads(json.dumps(data_get))
if 'data' in data_get_copy:
data_get_copy['data'] = replace_keys(data_get_copy['data'])
return data_get_copy
@staticmethod
def data_batch_create(data: dict, max_retries: int = 20) -> Optional[Dict]:
"""
新建单条表单数据
:param max_retries: 最大重试次数
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
:return: 返回创建后简道云返回的信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
return data_get
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data')} 连续{max_retries}次请求失败,放弃此次请求。")
return None
@staticmethod
def entry_data_batch_create(
data: dict,
chunk_size: int = 90,
max_retries: int = 20
) -> List[Optional[Dict]]:
"""
新建多条数据
:param max_retries: 最大重试次数
:param data: 应包含数据id、表单id、以及需要新建的信息,新建信息应该是一个列表
:param chunk_size: 简道云限制批量新建一次最多100条,这里默认值设置为90条一次
:return: 返回请求后的结果
"""
data = replace_decimals(data)
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
total_length = len(data['data_list'])
logger.info(f"多数据写入行数: {total_length}")
num_chunks = (total_length + chunk_size - 1) // chunk_size
data_get_list = []
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_list": data['data_list'][start_index:end_index],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
}, cls=NpEncoder)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if data_get.get("status") == "success":
data_get_list.append(data_get)
break
else:
logger.warning(f"请求异常,将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(
f"任务 {data['data_list'][start_index:end_index]} 连续{max_retries}次请求失败,放弃此次请求。")
data_get_list.append(None)
return data_get_list
@staticmethod
def entry_data_update(data: dict, max_retries: int = 20) -> dict:
"""
修改数据
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_id": data['data_id'],
"data": data['data']
})
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
return data_get
@staticmethod
def entry_data_banch_update(data: dict, max_retries: int = 20, chunk_size: int = 90) -> List[dict]:
"""
批量修改数据
:param chunk_size: 批量修改块大小
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果列表
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
total_length = len(data['data_ids'])
logger.info(f"多数据修改行数: {total_length}")
num_chunks = (total_length + chunk_size - 1) // chunk_size
data_get_list = []
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = {
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_ids": data['data_ids'][start_index:end_index],
"data": data['data']
}
if "transaction_id" in data:
payload["transaction_id"] = data["transaction_id"]
payload = json.dumps(payload, cls=NpEncoder)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
data_get_list.append(data_get)
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_ids')} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get_list
@staticmethod
def entry_data_delete(data: dict, max_retries: int = 20) -> dict:
"""
删除单条数据
:param data: 应包含应用ID、表单ID、数据ID
:param max_retries: 最大重试次数,默认20
:return: 删除结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/delete'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_id": data['data_id'],
})
retries = 0
delete_status = None
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
delete_status = res.json()
# 手动处理状态码 4001(数据不存在)
if delete_status == {
"code": 4001,
"msg": "Data does not exist."
}:
logger.info(f"返回结果: {delete_status}")
break
res.raise_for_status()
if res.status_code == 200:
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return delete_status
@staticmethod
def entry_data_batch_delete(
data: dict,
chunk_size: int = 90,
max_retries: int = 20
) -> List[Optional[Dict]]:
"""
批量删除数据
:param data: 应包含应用ID、表单ID、数据ID列表
:param chunk_size: 单次删除最大条数,默认90
:param max_retries: 重试次数,默认20
:return: 删除结果列表
"""
data = replace_decimals(data)
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_delete'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
total_length = len(data['data_ids'])
logger.info(f"多数据删除行数: {total_length}")
num_chunks = (total_length + chunk_size - 1) // chunk_size
data_get_list = []
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_ids": data['data_ids'][start_index:end_index],
}, cls=NpEncoder)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
logger.info(f"{i}页 返回结果: {data_get}")
if data_get.get("status") == "success":
data_get_list.append(data_get)
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(
f"批量删除任务第{i+1}批 连续{max_retries}次请求失败,放弃此次请求。")
data_get_list.append(None)
return data_get_list
@staticmethod
def workflow_instance_get(data: dict, max_retries: int = 20) -> dict:
"""
查询实例流程信息
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/workflow/instance/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
})
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
return data_get
@staticmethod
def workflow_task_approve(data: dict, max_retries: int = 20) -> dict:
"""
流程待办提交
:param max_retries: 最大重试次数
:param data: 应包含username、instance_id(data_id)、task_id等信息
:return: 返回简道云流程待办提交的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/task/approve'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"username": data["username"],
"instance_id": data["instance_id"],
"task_id": data['task_id'],
"comment": data.get("comment", "自动转交")
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
if res.status_code == 200:
return res.json()
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"任务 {data.get('task_id')} 连续{max_retries}次请求失败,放弃此次请求。")
return {}
@staticmethod
def workflow_task_hand_over(data: dict, max_retries: int = 10) -> Optional[dict]:
"""
流程待办转交
:param max_retries: 最大重试次数
:param data: 应包含username、instance_id(data_id)、task_id、transfer_username等信息
:return: 返回简道云流程待办转交的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/task/transfer'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"username": data["username"],
"instance_id": data["instance_id"],
"task_id": data['task_id'],
"transfer_username": data['transfer_username'],
"comment": data.get("comment", "转交")
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
if res.status_code == 200:
return res.json()
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"任务转交失败,已重试{max_retries}")
return None
@staticmethod
def get_upload_token(data: dict, max_retries: int = 10) -> Optional[Dict[str, Any]]:
"""
获取文件上传凭证
:param max_retries: 最大重试次数
:param data: 应包含应用ID、表单ID、事务ID
:return: 返回upload_url、upload_token
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/file/get_upload_token'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"transaction_id": data['transaction_id'],
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
res_j = res.json()
# 检查 token_and_url_list 是否存在且不为空
token_list = res_j.get('token_and_url_list', [])
if not token_list or len(token_list) == 0:
logger.warning(f"未获取到上传凭证列表,将重新请求")
retries += 1
time.sleep(3)
continue
token_item = token_list[0]
upload_url = token_item.get('url')
upload_token = token_item.get('token')
if not upload_url or not upload_token:
logger.warning(f"上传凭证信息不完整,将重新请求")
retries += 1
time.sleep(3)
continue
logger.info(f"返回结果: {upload_url}, {upload_token}")
if res.status_code == 200:
return {
'upload_url': upload_url,
'upload_token': upload_token
}
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except (requests.exceptions.RequestException, KeyError, IndexError, TypeError) as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"获取上传凭证失败,已重试{max_retries}")
return None
@staticmethod
def upload_file(data: dict, max_retries: int = 10) -> Optional[Any]:
"""
上传文件
:param max_retries: 最大重试次数
:param data: 应包含上传文件路径、上传文件url、上传文件token
:return: 返回上传文件结果
"""
url = data['upload_url']
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
}
file_path = data['file_path']
payload = {
"token": data['upload_token'],
}
retries = 0
result = None
while retries <= max_retries:
try:
with open(file_path, 'rb') as f:
files = {"file": f}
res = requests.post(url=url, data=payload, headers=headers, files=files, timeout=10)
res.raise_for_status()
data_get = res.json()
logger.info(f"返回结果: {data_get}")
if res.status_code == 200:
result = data_get
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"上传文件失败,已重试{max_retries}")
return result
+452
View File
@@ -0,0 +1,452 @@
# 添加新任务类指南
本文档详细说明如何在项目中添加一个新的任务类,包含立即响应和后台执行功能。
## 目录
1. [架构概述](#架构概述)
2. [添加步骤](#添加步骤)
3. [代码示例](#代码示例)
4. [文件结构说明](#文件结构说明)
5. [注意事项](#注意事项)
---
## 架构概述
项目采用**立即响应 + 后台执行**的架构模式:
- **立即响应函数**:位于 `app/module/f6_plugin_handlers.py`,负责快速响应请求,立即返回"正在执行"消息
- **后台执行函数**:位于 `app/tasks/` 目录下,负责在后台线程中执行实际任务
### 工作流程
```
客户端请求
立即响应函数(F6PluginHandlers
启动后台线程
后台执行函数(tasks/*.py
更新简道云表单
自动提交工作流
```
---
## 添加步骤
### 步骤 1: 创建后台任务文件
`app/tasks/` 目录下创建新的任务文件,例如 `app/tasks/your_task.py`
```python
"""
你的任务相关后台任务模块
本模块包含你的任务相关的后台任务,包括:
- 任务描述1
- 任务描述2
这些任务在后台线程中执行,不会阻塞主请求。
执行完成后会更新简道云表单并自动提交工作流。
"""
import logging
import os
import requests
import pandas as pd
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def your_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None,
df: pd.DataFrame = None, save_path: str = None):
"""
你的任务后台执行函数
在后台线程中执行你的任务。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息(可选)
df: Excel 文件读取的内容,DataFrame 格式(可选)
save_path: Excel 文件保存的地址,执行完成后会删除此文件(可选)
Returns:
None
注意:
- 执行完成后会自动删除上传的文件(如果提供了save_path)
- 执行结果会更新到简道云表单
"""
try:
# TODO: 在这里实现具体的任务逻辑
results = []
# 示例:处理数据
if df is not None:
df = df.where(pd.notnull(df), None)
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"):
# 实现具体的数据处理逻辑
result_item = {
'行号': index + 1,
'状态': '处理成功'
}
results.append(result_item)
else:
# 如果没有DataFrame,执行其他任务
results.append({'状态': '任务执行成功'})
# 删除文件(如果提供了save_path)
if save_path and os.path.exists(save_path):
os.remove(save_path)
logger.info(f'{save_path}已删除')
# 格式化结果
results_str = f'{results}' if results else '任务执行完成'
logger.info(f"任务执行结果: {results_str}")
# 调用api回写改掉 执行明细与执行状态
msg = update_jiandaoyun(data, results_str)
if msg.get('msg'):
approve_workflow(data)
logger.info('表单已自动提交至下一步')
except Exception as e:
error_msg = f'任务执行失败: {str(e)}'
logger.error(error_msg, exc_info=True)
msg = update_jiandaoyun(data, error_msg)
if msg.get('msg'):
approve_workflow(data)
```
### 步骤 2: 在任务导出文件中注册
`app/tasks/__init__.py` 中添加导入和导出:
```python
# 你的任务
from app.tasks.your_task import your_task_background
__all__ = [
# ... 其他任务
# 你的任务
'your_task_background',
]
```
### 步骤 3: 添加立即响应函数
`app/module/f6_plugin_handlers.py` 中添加立即响应函数:
```python
from app.tasks.your_task import your_task_background
class F6PluginHandlers:
# ... 其他方法
@staticmethod
def your_task(data: Dict[str, Any]) -> Dict[str, str]:
"""
你的任务
从简道云获取任务请求,读取 Excel 文件(如果需要),并在后台线程中执行任务。
立即返回"正在执行"的提示,实际执行在后台线程中完成。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典
"""
entry_data = api_instance.entry_data_get(data=data)
print('执行 你的任务')
# 获取必要的参数(根据实际需求调整)
username = entry_data['data'].get('账号')
password = entry_data['data'].get('密码')
company_name = entry_data['data'].get('公司名称')
save_path = entry_data['data'].get('文件保存地址')
# 如果需要登录F6系统
cookies = None
if username and password and company_name:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败', 'msg_details': '无法登录F6系统'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 如果需要读取Excel文件
df = None
if save_path:
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
# 启动后台线程执行任务
try:
thread = threading.Thread(target=your_task_background,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
return {'msg': '任务启动失败', 'msg_details': f'无法启动后台任务: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
```
### 步骤 4: 注册操作到模块注册表
`main.py``lifespan` 函数中注册操作:
```python
core_manager.register_action('your_task', f6_plugin_handlers.your_task, 'f6_plugin_module',
description='你的任务描述')
```
---
## 代码示例
### 完整示例:BI任务
以下是一个完整的示例,展示如何添加BI任务类:
#### 1. 后台任务文件 (`app/tasks/bi_tasks.py`)
```python
"""
BI相关后台任务模块
"""
import logging
import os
import pandas as pd
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def bi_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None,
df: pd.DataFrame = None, save_path: str = None):
"""BI任务后台执行函数"""
try:
results = []
if df is not None:
# 处理Excel数据
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理BI数据"):
results.append({'行号': index + 1, '状态': '处理成功'})
else:
results.append({'状态': 'BI任务执行成功'})
if save_path and os.path.exists(save_path):
os.remove(save_path)
msg = update_jiandaoyun(data, f'{results}')
if msg.get('msg'):
approve_workflow(data)
except Exception as e:
error_msg = f'BI任务执行失败: {str(e)}'
logger.error(error_msg, exc_info=True)
update_jiandaoyun(data, error_msg)
```
#### 2. 立即响应函数 (`app/module/f6_plugin_handlers.py`)
```python
@staticmethod
def bi_task(data: Dict[str, Any]) -> Dict[str, str]:
"""BI任务立即响应函数"""
entry_data = api_instance.entry_data_get(data=data)
# 获取参数
username = entry_data['data'].get('账号')
password = entry_data['data'].get('密码')
company_name = entry_data['data'].get('公司名称')
save_path = entry_data['data'].get('文件保存地址')
# 登录(如果需要)
cookies = None
if username and password and company_name:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 读取文件(如果需要)
df = None
if save_path:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
# 启动后台线程
thread = threading.Thread(target=bi_task_background,
args=(data, cookies, df, save_path))
thread.start()
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
```
#### 3. 注册操作 (`main.py`)
```python
core_manager.register_action('bi_task', f6_plugin_handlers.bi_task, 'f6_plugin_module',
description='BI任务')
```
---
## 文件结构说明
### 关键文件位置
```
fastapi_app/
├── app/
│ ├── module/
│ │ └── f6_plugin_handlers.py # 立即响应处理器(重命名自 f6_plugin_module.py
│ │
│ ├── tasks/
│ │ ├── __init__.py # 任务导出文件
│ │ ├── common.py # 通用任务函数
│ │ ├── bi_tasks.py # BI任务(示例)
│ │ ├── brand_tasks.py # 品牌任务
│ │ ├── customer_tasks.py # 客户任务
│ │ └── delete_tasks.py # 删除任务
│ │
│ └── api/
│ └── routes.py # API路由
└── main.py # 应用入口,注册所有操作
```
### 命名规范
- **立即响应函数**:位于 `F6PluginHandlers` 类中,使用小写下划线命名(如 `bi_task`
- **后台执行函数**:位于 `app/tasks/` 目录,使用 `{task_name}_background` 命名(如 `bi_task_background`
- **操作名称**:在 `main.py` 中注册时使用,通常与立即响应函数名相同(如 `'bi_task'`
---
## 注意事项
### 1. 文件命名变更
**重要**`f6_plugin_module.py` 已重命名为 `f6_plugin_handlers.py`,类名从 `F6PluginModule` 改为 `F6PluginHandlers`
- 文件名更清晰地表达了其功能:处理立即响应的处理器
- 所有引用已更新,但 `app.state.f6_plugin_module` 保持向后兼容
### 2. 参数传递
后台执行函数通常接收以下参数:
- `data`: 必需,包含简道云表单信息
- `cookies`: 可选,F6系统登录凭证
- `df`: 可选,Excel文件数据(DataFrame
- `save_path`: 可选,文件保存路径
### 3. 错误处理
- 立即响应函数:应捕获登录、文件读取等错误,立即返回错误信息
- 后台执行函数:应使用 try-except 包裹整个逻辑,确保错误能更新到简道云表单
### 4. 文件清理
如果任务处理了上传的文件,应在执行完成后删除:
```python
if save_path and os.path.exists(save_path):
os.remove(save_path)
logger.info(f'{save_path}已删除')
```
### 5. 简道云表单更新
所有后台任务完成后都应:
1. 调用 `update_jiandaoyun(data, results_str)` 更新执行结果
2. 如果更新成功,调用 `approve_workflow(data)` 自动提交工作流
### 6. 日志记录
使用项目统一的日志记录器:
```python
import logging
logger = logging.getLogger('app')
logger.info("信息日志")
logger.error("错误日志", exc_info=True) # exc_info=True 记录异常堆栈
```
### 7. 进度显示
对于批量处理任务,使用 `tqdm` 显示进度:
```python
from tqdm import tqdm
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"):
# 处理逻辑
```
---
## 快速检查清单
添加新任务类时,请确认:
- [ ] 创建了后台任务文件 `app/tasks/{task_name}_tasks.py`
- [ ]`app/tasks/__init__.py` 中导出了后台任务函数
- [ ]`app/module/f6_plugin_handlers.py` 中添加了立即响应函数
- [ ]`main.py` 中注册了操作
- [ ] 实现了错误处理逻辑
- [ ] 添加了日志记录
- [ ] 实现了文件清理(如果处理了文件)
- [ ] 实现了简道云表单更新和工作流提交
---
## 常见问题
### Q: 如何测试新添加的任务?
A: 启动应用后,通过API调用测试:
- 请求头设置 `Action: your_task`
- 请求体包含简道云表单数据
### Q: 任务执行失败怎么办?
A: 后台执行函数中的异常会被捕获,错误信息会更新到简道云表单的"执行明细"字段。
### Q: 如何修改任务执行逻辑?
A: 只需修改 `app/tasks/{task_name}_tasks.py` 中的后台执行函数即可。
### Q: 可以添加不需要登录的任务吗?
A: 可以,在立即响应函数中不调用 `F6Module.login_in``cookies` 参数传 `None` 即可。
---
## 相关文件
- `app/module/f6_plugin_handlers.py` - 立即响应处理器
- `app/tasks/common.py` - 通用任务函数(update_jiandaoyun, approve_workflow
- `app/core/module_registry.py` - 模块注册表
- `main.py` - 应用入口和操作注册
---
**最后更新**: 2025年
**维护者**: 数据组
+685
View File
@@ -0,0 +1,685 @@
# 简道云 FastAPI 服务系统说明书
## 1. 系统概述
### 1.1 系统简介
简道云 FastAPI 服务是一个基于 FastAPI 框架开发的后端服务系统,主要用于处理简道云插件发送的业务请求,与 F6 汽车维修管理系统进行数据交互,实现数据同步、批量处理、信息管理等核心功能。
### 1.2 系统定位
- **服务对象**: 简道云插件前端
- **集成系统**: F6 汽车维修管理系统、简道云平台
- **主要功能**: 数据同步、批量操作、信息管理、工作流自动化
### 1.3 系统特点
- **高性能**: 基于 FastAPI 异步框架,支持高并发请求
- **模块化**: 采用模块化设计,易于扩展和维护
- **可靠性**: 完善的异常处理和重试机制
- **自动化**: 支持后台任务自动执行和工作流自动提交
- **可维护性**: 清晰的代码结构和完善的日志记录
## 2. 系统功能
### 2.1 F6系统集成功能
#### 2.1.1 登录认证
**功能描述**: 实现 F6 系统的登录认证,支持用户名密码登录和验证码识别。
**主要流程**:
1. 接收用户名、密码、公司名称
2. 对密码进行 MD5 加密
3. 发送登录请求到 F6 系统
4. 如果触发验证码,自动识别并重试
5. 选择指定公司并完成登录
6. 返回登录结果和 Cookie 信息
**技术实现**:
- 使用 `requests` 库发送 HTTP 请求
- 使用 `pytesseract` 进行验证码 OCR 识别
- 使用 `PIL` 进行图像预处理(对比度、亮度调整)
#### 2.1.2 公司信息获取
**功能描述**: 获取 F6 系统中用户可访问的公司列表,并保存到简道云表单。
**主要流程**:
1. 使用用户名密码登录 F6 系统
2. 获取公司列表(单店或多店)
3. 将公司信息批量写入简道云表单
4. 返回时间戳用于后续查询
**输出格式**:
- 单店: 返回门店名称
- 多店: 返回所有公司名称列表
#### 2.1.3 门店信息获取
**功能描述**: 获取指定公司下的门店列表及统计数据。
**主要流程**:
1. 登录 F6 系统并选择公司
2. 获取门店列表
3. 统计客户车辆数量和客户数量
4. 将门店信息批量写入简道云表单
5. 返回时间戳和统计数据
**输出内容**:
- 门店名称列表
- 客户车辆总数
- 客户总数
#### 2.1.4 保持连接
**功能描述**: 心跳检测功能,用于保持连接活跃。
**实现**: 直接返回接收到的数据,不做任何处理。
### 2.2 文件处理功能
#### 2.2.1 文件上传和下载
**功能描述**: 处理简道云插件上传的文件,下载并保存到本地。
**主要流程**:
1. 从简道云获取文件 URL
2. 解析文件名和扩展名
3. 根据时间戳生成唯一文件名
4. 下载文件到 `下载文件/` 目录
5. 返回文件保存路径
**文件命名规则**: `原文件名_时间戳.扩展名`
#### 2.2.2 文件校验
**功能描述**: 校验上传的 Excel 文件格式是否符合要求。
**支持的操作类型**:
- `create_brand`: 校验第一列是否为"品牌"
- `modify_customer_info`: 校验第一列是否为"客户手机号"
- `delete_cars`: 暂不校验
**校验流程**:
1. 读取 Excel 文件第一列
2. 检查表头是否符合要求
3. 返回校验结果(成功/失败)
### 2.3 数据管理功能
#### 2.3.1 品牌批量创建
**功能描述**: 从 Excel 文件读取品牌名称,批量创建到 F6 系统。
**主要流程**:
1. 从简道云获取任务数据(账号、密码、公司名称、文件路径)
2. 登录 F6 系统
3. 读取 Excel 文件(第一列为品牌名称)
4. 在后台线程中批量创建品牌
5. 立即返回"正在执行"提示
6. 后台任务完成后更新简道云表单并自动提交工作流
**后台任务处理**:
- 遍历 Excel 文件中的每一行
- 调用 F6 API 创建品牌
- 记录成功和失败的结果
- 执行完成后删除上传的文件
- 更新简道云表单的执行明细
- 自动提交工作流到下一步
#### 2.3.2 客户信息修改
**功能描述**: 从 Excel 文件读取客户修改信息,批量修改 F6 系统中的客户信息。
**主要流程**:
1. 从简道云获取任务数据
2. 登录 F6 系统
3. 读取 Excel 文件(包含客户手机号、修改字段等)
4. 获取所有客户列表
5. 获取员工列表(用于匹配专属运营顾问)
6. 在后台线程中批量修改客户信息
7. 立即返回"正在执行中"提示
**Excel 文件格式**:
- 第一列: 客户手机号(必需,用于匹配)
- 其他列: 需要修改的字段(客户姓名、客户类型、客户来源、单位名称、专属运营顾问、客户备注等)
**匹配逻辑**:
- 根据客户手机号匹配 F6 系统中的客户
- 获取客户原始信息
- 合并 Excel 中的修改字段
- 解析地址信息(省市区)
- 匹配专属运营顾问的用户ID
#### 2.3.3 客户信息删除
**功能描述**: 批量删除 F6 系统中的客户信息。
**主要流程**:
1. 从简道云获取任务数据
2. 登录 F6 系统
3. 获取所有客户列表
4. 获取会员卡列表(提取客户ID
5. 在后台线程中批量删除客户
6. 立即返回"正在执行中"提示
**删除规则**:
- 跳过有最近消费时间的客户
- 跳过有会员卡的客户
- 8-20点之间每3.5秒删除一条,其余时间每1.5秒删除一条
- 记录成功和失败次数
#### 2.3.4 车辆信息删除
**功能描述**: 批量删除 F6 系统中的客户车辆信息。
**主要流程**:
1. 从简道云获取任务数据
2. 登录 F6 系统
3. 分页获取所有车辆列表
4. 获取会员卡列表(提取车辆ID
5. 在后台线程中批量删除车辆
6. 立即返回"正在执行中"提示
**删除规则**:
- 跳过有最近消费时间的客户车辆
- 跳过有会员卡的车辆
- 8-20点之间每3秒删除一条,其余时间每1秒删除一条
- 记录成功和失败次数
#### 2.3.5 历史记录删除
**功能描述**: 删除指定门店的历史维修记录。
**主要流程**:
1. 从简道云获取任务数据(账号、密码、公司名称、门店名称)
2. 登录 F6 系统
3. 获取门店列表并匹配门店ID
4. 在后台线程中删除历史维修记录
5. 立即返回"正在执行中"提示
**删除方式**: 调用 F6 系统的删除接口,删除整个门店的历史维修记录。
### 2.4 BI任务功能
**功能描述**: 执行 BI 相关的数据处理和报表生成任务。
**当前状态**: 框架已搭建,具体业务逻辑待实现。
**预留接口**:
- 支持从 Excel 文件读取数据
- 支持 F6 系统登录(如需要)
- 支持后台线程执行
- 支持结果回写到简道云
### 2.5 工作流自动化
**功能描述**: 自动获取简道云工作流的待处理任务并提交到下一步。
**主要流程**:
1. 获取工作流实例信息
2. 查找状态为待处理(status=0)的任务
3. 提取任务信息(username、instance_id、task_id
4. 调用审批接口自动提交
5. 记录执行结果
**应用场景**: 后台任务执行完成后,自动将简道云表单提交到工作流下一步,无需人工干预。
## 3. 技术架构
### 3.1 系统架构
```
┌─────────────────┐
│ 简道云插件前端 │
└────────┬────────┘
│ HTTP/JSON
┌─────────────────┐
│ FastAPI 服务 │
│ ┌───────────┐ │
│ │ 路由层 │ │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 业务模块 │ │
│ │ - F6Module│ │
│ │ - Plugin │ │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 任务队列 │ │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 后台任务 │ │
│ └───────────┘ │
└────────┬────────┘
┌────┴────┐
▼ ▼
┌────────┐ ┌────────┐
│ F6系统 │ │ 简道云 │
└────────┘ └────────┘
```
### 3.2 核心组件
#### 3.2.1 应用入口 (main.py)
**职责**:
- 应用初始化和生命周期管理
- 模块注册和路由配置
- 异常处理器配置
- 中间件配置(CORS
**关键功能**:
- `lifespan`: 管理应用启动和关闭
- 全局异常处理
- 模块实例化并注册到注册表
#### 3.2.2 路由层 (app/api/routes.py)
**职责**:
- 定义 API 端点
- 请求验证和参数解析
- 调用业务模块处理请求
- 返回响应
**主要端点**:
- `GET /health`: 健康检查
- `POST /webhook`: 业务请求处理
#### 3.2.3 业务模块层
**F6Module** (`app/module/module.py`):
- F6 系统登录和认证
- 公司信息获取
- 门店信息获取
- 保持连接
**F6PluginModule** (`app/module/f6_plugin_module.py`):
- 文件上传和校验
- 品牌批量创建
- 客户信息管理(修改、删除)
- 车辆信息删除
- 历史记录删除
- BI任务
**OtherPluginModule** (`app/module/other_module.py`):
- 其他插件功能(如短信签名状态)
#### 3.2.4 任务处理层
**任务队列** (`app/utils/app_tools.py`):
- 使用 `Queue` 实现任务队列
- 后台线程处理任务
- 支持同步等待结果
**后台任务** (`app/tasks/`):
- `common.py`: 通用任务函数(更新简道云、工作流审批等)
- `brand_tasks.py`: 品牌相关任务
- `customer_tasks.py`: 客户相关任务
- `delete_tasks.py`: 删除相关任务
- `bi_tasks.py`: BI相关任务
#### 3.2.5 核心工具层
**模块注册表** (`app/core/module_registry.py`):
- 统一管理所有业务操作
- 支持操作查询和路由
- 存储操作元数据(描述、模块名等)
**应用工具** (`app/utils/app_tools.py`):
- 日志记录器配置(支持轮转)
- 任务队列管理
- 请求头解码工具
- 后台调度器
**简道云API封装** (`app/api/__init__.py`):
- 封装简道云所有API接口
- 支持失败重试机制
- 字段ID到标签名的替换
- 批量操作支持
### 3.3 数据流
#### 3.3.1 同步任务流程
```
简道云插件 → FastAPI路由 → 业务模块 → 任务队列 → 处理线程 → 返回结果 → 简道云插件
```
#### 3.3.2 后台任务流程
```
简道云插件 → FastAPI路由 → 业务模块 → 启动后台线程 → 立即返回
后台任务执行
更新简道云表单
自动提交工作流
```
### 3.4 技术选型
| 技术 | 版本 | 用途 |
|------|------|------|
| FastAPI | 0.121.0 | Web框架 |
| Uvicorn | 0.38.0 | ASGI服务器 |
| Pydantic | - | 数据验证 |
| Requests | 2.32.5 | HTTP请求 |
| Pandas | 2.3.3 | Excel处理 |
| APScheduler | 3.11.1 | 任务调度 |
| Pillow | 12.0.0 | 图像处理 |
| Pytesseract | 0.3.13 | OCR识别 |
## 4. 部署运维
### 4.1 环境要求
- **操作系统**: Windows/Linux/macOS
- **Python版本**: 3.8+
- **内存**: 建议 2GB+
- **磁盘**: 根据文件存储需求,建议 10GB+
### 4.2 安装步骤
1. **克隆代码**:
```bash
git clone <repository_url>
cd fastapi_app
```
2. **安装依赖**:
```bash
pip install -r requirements.txt
```
3. **配置环境**:
- 编辑 `app/config.py`,配置 API Token
- 确保目录权限正确(logs、下载文件、模板文件)
4. **启动服务**:
```bash
python main.py
# 或
uvicorn main:app --host 0.0.0.0 --port 5003
```
### 4.3 生产环境部署
#### 4.3.1 使用 Gunicorn + Uvicorn Workers
```bash
pip install gunicorn
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:5003
```
#### 4.3.2 使用 systemd 管理服务(Linux
创建服务文件 `/etc/systemd/system/fastapi-app.service`:
```ini
[Unit]
Description=简道云 FastAPI 服务
After=network.target
[Service]
Type=simple
User=your_user
WorkingDirectory=/path/to/fastapi_app
Environment="PATH=/path/to/venv/bin"
ExecStart=/path/to/venv/bin/uvicorn main:app --host 0.0.0.0 --port 5003
Restart=always
[Install]
WantedBy=multi-user.target
```
启动服务:
```bash
sudo systemctl enable fastapi-app
sudo systemctl start fastapi-app
```
#### 4.3.3 使用 Nginx 反向代理
Nginx 配置示例:
```nginx
server {
listen 80;
server_name your_domain.com;
location / {
proxy_pass http://127.0.0.1:5003;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
```
### 4.4 监控和日志
#### 4.4.1 日志管理
- **日志位置**: `logs/简道云.log`
- **日志轮转**: 单文件最大 5MB,保留 5 个备份
- **日志级别**: INFO
- **日志格式**: `时间戳 级别:模块名:消息`
#### 4.4.2 健康检查
定期访问 `/health` 端点检查服务状态:
```bash
curl http://localhost:5003/health
```
#### 4.4.3 性能监控
建议使用以下工具监控服务性能:
- **APM工具**: New Relic、Datadog 等
- **日志分析**: ELK Stack、Loki 等
- **指标监控**: Prometheus + Grafana
### 4.5 备份和恢复
#### 4.5.1 数据备份
需要备份的内容:
- 配置文件 (`app/config.py`)
- 日志文件 (`logs/`)
- 下载的文件 (`下载文件/`)
- 模板文件 (`模板文件/`)
#### 4.5.2 恢复步骤
1. 恢复代码和配置文件
2. 恢复数据文件
3. 重新安装依赖
4. 重启服务
## 5. 安全说明
### 5.1 认证和授权
- **API Token**: 使用简道云 API Token 进行认证
- **F6系统**: 使用用户名密码登录,密码进行 MD5 加密
### 5.2 数据安全
- **敏感信息**: API Token 建议使用环境变量管理
- **密码处理**: 密码在传输前进行 MD5 加密
- **文件存储**: 上传的文件在处理完成后自动删除
### 5.3 网络安全
- **CORS配置**: 生产环境建议限制允许的来源
- **HTTPS**: 建议使用 HTTPS 加密传输
- **防火墙**: 限制服务端口访问
### 5.4 安全建议
1. 定期更新依赖包,修复安全漏洞
2. 使用强密码策略
3. 定期审查日志,发现异常访问
4. 限制文件上传大小和类型
5. 实施请求频率限制
## 6. 故障排查
### 6.1 常见问题
#### 问题1: 服务启动失败
**可能原因**:
- 端口被占用
- 依赖包未安装
- 配置文件错误
**解决方法**:
- 检查端口占用: `netstat -ano | findstr 5003`
- 重新安装依赖: `pip install -r requirements.txt`
- 检查配置文件格式
#### 问题2: 登录失败
**可能原因**:
- 用户名密码错误
- 公司名称不正确
- 验证码识别失败
- F6系统服务异常
**解决方法**:
- 验证用户名密码和公司名称
- 检查 Tesseract OCR 是否正确安装
- 查看日志获取详细错误信息
#### 问题3: 文件上传失败
**可能原因**:
- 文件格式不正确
- 文件路径不存在
- 磁盘空间不足
- 网络连接问题
**解决方法**:
- 检查文件格式是否符合要求
- 确保目录存在且有写权限
- 检查磁盘空间
- 查看网络连接状态
#### 问题4: 后台任务未执行
**可能原因**:
- 线程启动失败
- 任务执行异常
- 简道云API调用失败
**解决方法**:
- 查看日志中的错误信息
- 检查简道云API Token是否有效
- 验证任务数据格式是否正确
### 6.2 日志分析
查看日志文件:
```bash
tail -f logs/简道云.log
```
搜索错误:
```bash
grep -i error logs/简道云.log
```
### 6.3 性能优化
1. **并发处理**: 调整 Uvicorn workers 数量
2. **数据库连接池**: 如使用数据库,配置连接池
3. **缓存**: 对频繁访问的数据使用缓存
4. **异步处理**: 更多使用异步函数提高性能
## 7. 扩展开发
### 7.1 添加新功能模块
1. 在 `app/module/` 下创建新模块文件
2. 实现业务逻辑方法
3. 在 `main.py` 中注册模块和操作
4. 在 `app/schemas.py` 中添加数据模型(如需要)
### 7.2 添加新的API端点
1. 在 `app/api/routes.py` 中添加路由
2. 使用依赖注入获取所需服务
3. 实现业务逻辑
4. 返回响应
### 7.3 添加新的后台任务
1. 在 `app/tasks/` 下创建任务文件
2. 实现后台任务函数
3. 在业务模块中调用,启动线程
4. 使用 `update_jiandaoyun` 更新结果
5. 使用 `approve_workflow` 自动提交工作流
## 8. 附录
### 8.1 API接口清单
| 端点 | 方法 | 描述 |
|------|------|------|
| `/health` | GET | 健康检查 |
| `/webhook` | POST | 业务请求处理 |
### 8.2 操作类型清单
| 操作名 | 模块 | 类型 |
|--------|------|------|
| `login_in` | F6Module | 同步 |
| `get_company_information` | F6Module | 同步 |
| `get_store_information` | F6Module | 同步 |
| `keep_alive` | F6Module | 同步 |
| `check_file` | F6PluginModule | 同步 |
| `create_brand` | F6PluginModule | 后台 |
| `delete_history` | F6PluginModule | 后台 |
| `delete_customer` | F6PluginModule | 后台 |
| `delete_cars` | F6PluginModule | 后台 |
| `modify_customer_info` | F6PluginModule | 后台 |
| `bi_task` | F6PluginModule | 后台 |
| `sms_signature_status` | OtherPluginModule | 同步 |
### 8.3 配置文件说明
**app/config.py**:
- `BASE_DIR`: 项目根目录
- `SAVE_DIRECTORY`: 下载文件目录
- `MODE_DIRECTORY`: 模板文件目录
- `LOGS_DIRECTORY`: 日志目录
- `LOG_FILE`: 日志文件路径
- `JIANDAOYUN_API_TOKEN`: 简道云API Token
### 8.4 目录结构说明
- `logs/`: 日志文件存储
- `下载文件/`: 从简道云下载的文件临时存储
- `模板文件/`: 模板文件存储
- `app/`: 应用代码目录
- `app/api/`: API相关代码
- `app/core/`: 核心功能代码
- `app/module/`: 业务模块代码
- `app/tasks/`: 后台任务代码
- `app/utils/`: 工具类代码
---
**文档版本**: 2.0.0
**最后更新**: 2025年
**维护团队**: 数据组