5cde7f852a
2.新增项目批量停用、材料批量修改功能
14 KiB
14 KiB
添加新任务类指南
本文档详细说明如何在项目中添加一个新的任务类,包含立即响应和后台执行功能。
目录
架构概述
项目采用立即响应 + 后台执行的架构模式:
- 立即响应函数:位于
app/module/f6_plugin_handlers.py,负责快速响应请求,立即返回"正在执行"消息 - 后台执行函数:位于
app/tasks/目录下,负责在后台线程中执行实际任务
工作流程
客户端请求
↓
立即响应函数(F6PluginHandlers)
↓
启动后台线程
↓
后台执行函数(tasks/*.py)
↓
更新简道云表单
↓
自动提交工作流
添加步骤
步骤 1: 创建后台任务文件
在 app/tasks/ 目录下创建新的任务文件,例如 app/tasks/your_task.py:
"""
你的任务相关后台任务模块
本模块包含你的任务相关的后台任务,包括:
- 任务描述1
- 任务描述2
这些任务在后台线程中执行,不会阻塞主请求。
执行完成后会更新简道云表单并自动提交工作流。
"""
import logging
import os
import requests
import pandas as pd
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def your_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None,
df: pd.DataFrame = None, save_path: str = None):
"""
你的任务后台执行函数
在后台线程中执行你的任务。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息(可选)
df: Excel 文件读取的内容,DataFrame 格式(可选)
save_path: Excel 文件保存的地址,执行完成后会删除此文件(可选)
Returns:
None
注意:
- 执行完成后会自动删除上传的文件(如果提供了save_path)
- 执行结果会更新到简道云表单
"""
try:
# TODO: 在这里实现具体的任务逻辑
results = []
# 示例:处理数据
if df is not None:
df = df.where(pd.notnull(df), None)
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"):
# 实现具体的数据处理逻辑
result_item = {
'行号': index + 1,
'状态': '处理成功'
}
results.append(result_item)
else:
# 如果没有DataFrame,执行其他任务
results.append({'状态': '任务执行成功'})
# 删除文件(如果提供了save_path)
if save_path and os.path.exists(save_path):
os.remove(save_path)
logger.info(f'{save_path}已删除')
# 格式化结果
results_str = f'{results}' if results else '任务执行完成'
logger.info(f"任务执行结果: {results_str}")
# 调用api回写改掉 执行明细与执行状态
msg = update_jiandaoyun(data, results_str)
if msg.get('msg'):
approve_workflow(data)
logger.info('表单已自动提交至下一步')
except Exception as e:
error_msg = f'任务执行失败: {str(e)}'
logger.error(error_msg, exc_info=True)
msg = update_jiandaoyun(data, error_msg)
if msg.get('msg'):
approve_workflow(data)
步骤 2: 在任务导出文件中注册
在 app/tasks/__init__.py 中添加导入和导出:
# 你的任务
from app.tasks.your_task import your_task_background
__all__ = [
# ... 其他任务
# 你的任务
'your_task_background',
]
步骤 3: 添加立即响应函数
在 app/module/f6_plugin_handlers.py 中添加立即响应函数:
from app.tasks.your_task import your_task_background
class F6PluginHandlers:
# ... 其他方法
@staticmethod
def your_task(data: Dict[str, Any]) -> Dict[str, str]:
"""
你的任务
从简道云获取任务请求,读取 Excel 文件(如果需要),并在后台线程中执行任务。
立即返回"正在执行"的提示,实际执行在后台线程中完成。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典
"""
entry_data = api_instance.entry_data_get(data=data)
print('执行 你的任务')
# 获取必要的参数(根据实际需求调整)
username = entry_data['data'].get('账号')
password = entry_data['data'].get('密码')
company_name = entry_data['data'].get('公司名称')
save_path = entry_data['data'].get('文件保存地址')
# 如果需要登录F6系统
cookies = None
if username and password and company_name:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败', 'msg_details': '无法登录F6系统'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 如果需要读取Excel文件
df = None
if save_path:
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
# 启动后台线程执行任务
try:
thread = threading.Thread(target=your_task_background,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
return {'msg': '任务启动失败', 'msg_details': f'无法启动后台任务: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
步骤 4: 注册操作到模块注册表
在 main.py 的 lifespan 函数中注册操作:
core_manager.register_action('your_task', f6_plugin_handlers.your_task, 'f6_plugin_module',
description='你的任务描述')
代码示例
完整示例:BI任务
以下是一个完整的示例,展示如何添加BI任务类:
1. 后台任务文件 (app/tasks/bi_tasks.py)
"""
BI相关后台任务模块
"""
import logging
import os
import pandas as pd
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def bi_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None,
df: pd.DataFrame = None, save_path: str = None):
"""BI任务后台执行函数"""
try:
results = []
if df is not None:
# 处理Excel数据
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理BI数据"):
results.append({'行号': index + 1, '状态': '处理成功'})
else:
results.append({'状态': 'BI任务执行成功'})
if save_path and os.path.exists(save_path):
os.remove(save_path)
msg = update_jiandaoyun(data, f'{results}')
if msg.get('msg'):
approve_workflow(data)
except Exception as e:
error_msg = f'BI任务执行失败: {str(e)}'
logger.error(error_msg, exc_info=True)
update_jiandaoyun(data, error_msg)
2. 立即响应函数 (app/module/f6_plugin_handlers.py)
@staticmethod
def bi_task(data: Dict[str, Any]) -> Dict[str, str]:
"""BI任务立即响应函数"""
entry_data = api_instance.entry_data_get(data=data)
# 获取参数
username = entry_data['data'].get('账号')
password = entry_data['data'].get('密码')
company_name = entry_data['data'].get('公司名称')
save_path = entry_data['data'].get('文件保存地址')
# 登录(如果需要)
cookies = None
if username and password and company_name:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 读取文件(如果需要)
df = None
if save_path:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
# 启动后台线程
thread = threading.Thread(target=bi_task_background,
args=(data, cookies, df, save_path))
thread.start()
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
3. 注册操作 (main.py)
core_manager.register_action('bi_task', f6_plugin_handlers.bi_task, 'f6_plugin_module',
description='BI任务')
文件结构说明
关键文件位置
fastapi_app/
├── app/
│ ├── module/
│ │ └── f6_plugin_handlers.py # 立即响应处理器(重命名自 f6_plugin_module.py)
│ │
│ ├── tasks/
│ │ ├── __init__.py # 任务导出文件
│ │ ├── common.py # 通用任务函数
│ │ ├── bi_tasks.py # BI任务(示例)
│ │ ├── brand_tasks.py # 品牌任务
│ │ ├── customer_tasks.py # 客户任务
│ │ └── delete_tasks.py # 删除任务
│ │
│ └── api/
│ └── routes.py # API路由
│
└── main.py # 应用入口,注册所有操作
命名规范
- 立即响应函数:位于
F6PluginHandlers类中,使用小写下划线命名(如bi_task) - 后台执行函数:位于
app/tasks/目录,使用{task_name}_background命名(如bi_task_background) - 操作名称:在
main.py中注册时使用,通常与立即响应函数名相同(如'bi_task')
注意事项
1. 文件命名变更
重要:f6_plugin_module.py 已重命名为 f6_plugin_handlers.py,类名从 F6PluginModule 改为 F6PluginHandlers。
- 文件名更清晰地表达了其功能:处理立即响应的处理器
- 所有引用已更新,但
app.state.f6_plugin_module保持向后兼容
2. 参数传递
后台执行函数通常接收以下参数:
data: 必需,包含简道云表单信息cookies: 可选,F6系统登录凭证df: 可选,Excel文件数据(DataFrame)save_path: 可选,文件保存路径
3. 错误处理
- 立即响应函数:应捕获登录、文件读取等错误,立即返回错误信息
- 后台执行函数:应使用 try-except 包裹整个逻辑,确保错误能更新到简道云表单
4. 文件清理
如果任务处理了上传的文件,应在执行完成后删除:
if save_path and os.path.exists(save_path):
os.remove(save_path)
logger.info(f'{save_path}已删除')
5. 简道云表单更新
所有后台任务完成后都应:
- 调用
update_jiandaoyun(data, results_str)更新执行结果 - 如果更新成功,调用
approve_workflow(data)自动提交工作流
6. 日志记录
使用项目统一的日志记录器:
import logging
logger = logging.getLogger('app')
logger.info("信息日志")
logger.error("错误日志", exc_info=True) # exc_info=True 记录异常堆栈
7. 进度显示
对于批量处理任务,使用 tqdm 显示进度:
from tqdm import tqdm
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"):
# 处理逻辑
快速检查清单
添加新任务类时,请确认:
- 创建了后台任务文件
app/tasks/{task_name}_tasks.py - 在
app/tasks/__init__.py中导出了后台任务函数 - 在
app/module/f6_plugin_handlers.py中添加了立即响应函数 - 在
main.py中注册了操作 - 实现了错误处理逻辑
- 添加了日志记录
- 实现了文件清理(如果处理了文件)
- 实现了简道云表单更新和工作流提交
常见问题
Q: 如何测试新添加的任务?
A: 启动应用后,通过API调用测试:
- 请求头设置
Action: your_task - 请求体包含简道云表单数据
Q: 任务执行失败怎么办?
A: 后台执行函数中的异常会被捕获,错误信息会更新到简道云表单的"执行明细"字段。
Q: 如何修改任务执行逻辑?
A: 只需修改 app/tasks/{task_name}_tasks.py 中的后台执行函数即可。
Q: 可以添加不需要登录的任务吗?
A: 可以,在立即响应函数中不调用 F6Module.login_in,cookies 参数传 None 即可。
相关文件
app/module/f6_plugin_handlers.py- 立即响应处理器app/tasks/common.py- 通用任务函数(update_jiandaoyun, approve_workflow)app/core/module_registry.py- 模块注册表main.py- 应用入口和操作注册
最后更新: 2025年
维护者: 数据组