Files
jdy_fastapi/doc/添加新任务类指南.md
T
panda 5cde7f852a 1.客户信息修改,将硬编码改为动态取值
2.新增项目批量停用、材料批量修改功能
2026-01-08 10:19:16 +08:00

453 lines
14 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 添加新任务类指南
本文档详细说明如何在项目中添加一个新的任务类,包含立即响应和后台执行功能。
## 目录
1. [架构概述](#架构概述)
2. [添加步骤](#添加步骤)
3. [代码示例](#代码示例)
4. [文件结构说明](#文件结构说明)
5. [注意事项](#注意事项)
---
## 架构概述
项目采用**立即响应 + 后台执行**的架构模式:
- **立即响应函数**:位于 `app/module/f6_plugin_handlers.py`,负责快速响应请求,立即返回"正在执行"消息
- **后台执行函数**:位于 `app/tasks/` 目录下,负责在后台线程中执行实际任务
### 工作流程
```
客户端请求
立即响应函数(F6PluginHandlers
启动后台线程
后台执行函数(tasks/*.py
更新简道云表单
自动提交工作流
```
---
## 添加步骤
### 步骤 1: 创建后台任务文件
`app/tasks/` 目录下创建新的任务文件,例如 `app/tasks/your_task.py`
```python
"""
你的任务相关后台任务模块
本模块包含你的任务相关的后台任务,包括:
- 任务描述1
- 任务描述2
这些任务在后台线程中执行,不会阻塞主请求。
执行完成后会更新简道云表单并自动提交工作流。
"""
import logging
import os
import requests
import pandas as pd
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def your_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None,
df: pd.DataFrame = None, save_path: str = None):
"""
你的任务后台执行函数
在后台线程中执行你的任务。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息(可选)
df: Excel 文件读取的内容,DataFrame 格式(可选)
save_path: Excel 文件保存的地址,执行完成后会删除此文件(可选)
Returns:
None
注意:
- 执行完成后会自动删除上传的文件(如果提供了save_path)
- 执行结果会更新到简道云表单
"""
try:
# TODO: 在这里实现具体的任务逻辑
results = []
# 示例:处理数据
if df is not None:
df = df.where(pd.notnull(df), None)
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"):
# 实现具体的数据处理逻辑
result_item = {
'行号': index + 1,
'状态': '处理成功'
}
results.append(result_item)
else:
# 如果没有DataFrame,执行其他任务
results.append({'状态': '任务执行成功'})
# 删除文件(如果提供了save_path)
if save_path and os.path.exists(save_path):
os.remove(save_path)
logger.info(f'{save_path}已删除')
# 格式化结果
results_str = f'{results}' if results else '任务执行完成'
logger.info(f"任务执行结果: {results_str}")
# 调用api回写改掉 执行明细与执行状态
msg = update_jiandaoyun(data, results_str)
if msg.get('msg'):
approve_workflow(data)
logger.info('表单已自动提交至下一步')
except Exception as e:
error_msg = f'任务执行失败: {str(e)}'
logger.error(error_msg, exc_info=True)
msg = update_jiandaoyun(data, error_msg)
if msg.get('msg'):
approve_workflow(data)
```
### 步骤 2: 在任务导出文件中注册
`app/tasks/__init__.py` 中添加导入和导出:
```python
# 你的任务
from app.tasks.your_task import your_task_background
__all__ = [
# ... 其他任务
# 你的任务
'your_task_background',
]
```
### 步骤 3: 添加立即响应函数
`app/module/f6_plugin_handlers.py` 中添加立即响应函数:
```python
from app.tasks.your_task import your_task_background
class F6PluginHandlers:
# ... 其他方法
@staticmethod
def your_task(data: Dict[str, Any]) -> Dict[str, str]:
"""
你的任务
从简道云获取任务请求,读取 Excel 文件(如果需要),并在后台线程中执行任务。
立即返回"正在执行"的提示,实际执行在后台线程中完成。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典
"""
entry_data = api_instance.entry_data_get(data=data)
print('执行 你的任务')
# 获取必要的参数(根据实际需求调整)
username = entry_data['data'].get('账号')
password = entry_data['data'].get('密码')
company_name = entry_data['data'].get('公司名称')
save_path = entry_data['data'].get('文件保存地址')
# 如果需要登录F6系统
cookies = None
if username and password and company_name:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败', 'msg_details': '无法登录F6系统'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 如果需要读取Excel文件
df = None
if save_path:
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
# 启动后台线程执行任务
try:
thread = threading.Thread(target=your_task_background,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
return {'msg': '任务启动失败', 'msg_details': f'无法启动后台任务: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
```
### 步骤 4: 注册操作到模块注册表
`main.py``lifespan` 函数中注册操作:
```python
core_manager.register_action('your_task', f6_plugin_handlers.your_task, 'f6_plugin_module',
description='你的任务描述')
```
---
## 代码示例
### 完整示例:BI任务
以下是一个完整的示例,展示如何添加BI任务类:
#### 1. 后台任务文件 (`app/tasks/bi_tasks.py`)
```python
"""
BI相关后台任务模块
"""
import logging
import os
import pandas as pd
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def bi_task_background(data: Dict[str, Any], cookies: Dict[str, str] = None,
df: pd.DataFrame = None, save_path: str = None):
"""BI任务后台执行函数"""
try:
results = []
if df is not None:
# 处理Excel数据
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理BI数据"):
results.append({'行号': index + 1, '状态': '处理成功'})
else:
results.append({'状态': 'BI任务执行成功'})
if save_path and os.path.exists(save_path):
os.remove(save_path)
msg = update_jiandaoyun(data, f'{results}')
if msg.get('msg'):
approve_workflow(data)
except Exception as e:
error_msg = f'BI任务执行失败: {str(e)}'
logger.error(error_msg, exc_info=True)
update_jiandaoyun(data, error_msg)
```
#### 2. 立即响应函数 (`app/module/f6_plugin_handlers.py`)
```python
@staticmethod
def bi_task(data: Dict[str, Any]) -> Dict[str, str]:
"""BI任务立即响应函数"""
entry_data = api_instance.entry_data_get(data=data)
# 获取参数
username = entry_data['data'].get('账号')
password = entry_data['data'].get('密码')
company_name = entry_data['data'].get('公司名称')
save_path = entry_data['data'].get('文件保存地址')
# 登录(如果需要)
cookies = None
if username and password and company_name:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 读取文件(如果需要)
df = None
if save_path:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
# 启动后台线程
thread = threading.Thread(target=bi_task_background,
args=(data, cookies, df, save_path))
thread.start()
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
```
#### 3. 注册操作 (`main.py`)
```python
core_manager.register_action('bi_task', f6_plugin_handlers.bi_task, 'f6_plugin_module',
description='BI任务')
```
---
## 文件结构说明
### 关键文件位置
```
fastapi_app/
├── app/
│ ├── module/
│ │ └── f6_plugin_handlers.py # 立即响应处理器(重命名自 f6_plugin_module.py
│ │
│ ├── tasks/
│ │ ├── __init__.py # 任务导出文件
│ │ ├── common.py # 通用任务函数
│ │ ├── bi_tasks.py # BI任务(示例)
│ │ ├── brand_tasks.py # 品牌任务
│ │ ├── customer_tasks.py # 客户任务
│ │ └── delete_tasks.py # 删除任务
│ │
│ └── api/
│ └── routes.py # API路由
└── main.py # 应用入口,注册所有操作
```
### 命名规范
- **立即响应函数**:位于 `F6PluginHandlers` 类中,使用小写下划线命名(如 `bi_task`
- **后台执行函数**:位于 `app/tasks/` 目录,使用 `{task_name}_background` 命名(如 `bi_task_background`
- **操作名称**:在 `main.py` 中注册时使用,通常与立即响应函数名相同(如 `'bi_task'`
---
## 注意事项
### 1. 文件命名变更
**重要**`f6_plugin_module.py` 已重命名为 `f6_plugin_handlers.py`,类名从 `F6PluginModule` 改为 `F6PluginHandlers`
- 文件名更清晰地表达了其功能:处理立即响应的处理器
- 所有引用已更新,但 `app.state.f6_plugin_module` 保持向后兼容
### 2. 参数传递
后台执行函数通常接收以下参数:
- `data`: 必需,包含简道云表单信息
- `cookies`: 可选,F6系统登录凭证
- `df`: 可选,Excel文件数据(DataFrame
- `save_path`: 可选,文件保存路径
### 3. 错误处理
- 立即响应函数:应捕获登录、文件读取等错误,立即返回错误信息
- 后台执行函数:应使用 try-except 包裹整个逻辑,确保错误能更新到简道云表单
### 4. 文件清理
如果任务处理了上传的文件,应在执行完成后删除:
```python
if save_path and os.path.exists(save_path):
os.remove(save_path)
logger.info(f'{save_path}已删除')
```
### 5. 简道云表单更新
所有后台任务完成后都应:
1. 调用 `update_jiandaoyun(data, results_str)` 更新执行结果
2. 如果更新成功,调用 `approve_workflow(data)` 自动提交工作流
### 6. 日志记录
使用项目统一的日志记录器:
```python
import logging
logger = logging.getLogger('app')
logger.info("信息日志")
logger.error("错误日志", exc_info=True) # exc_info=True 记录异常堆栈
```
### 7. 进度显示
对于批量处理任务,使用 `tqdm` 显示进度:
```python
from tqdm import tqdm
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理数据"):
# 处理逻辑
```
---
## 快速检查清单
添加新任务类时,请确认:
- [ ] 创建了后台任务文件 `app/tasks/{task_name}_tasks.py`
- [ ]`app/tasks/__init__.py` 中导出了后台任务函数
- [ ]`app/module/f6_plugin_handlers.py` 中添加了立即响应函数
- [ ]`main.py` 中注册了操作
- [ ] 实现了错误处理逻辑
- [ ] 添加了日志记录
- [ ] 实现了文件清理(如果处理了文件)
- [ ] 实现了简道云表单更新和工作流提交
---
## 常见问题
### Q: 如何测试新添加的任务?
A: 启动应用后,通过API调用测试:
- 请求头设置 `Action: your_task`
- 请求体包含简道云表单数据
### Q: 任务执行失败怎么办?
A: 后台执行函数中的异常会被捕获,错误信息会更新到简道云表单的"执行明细"字段。
### Q: 如何修改任务执行逻辑?
A: 只需修改 `app/tasks/{task_name}_tasks.py` 中的后台执行函数即可。
### Q: 可以添加不需要登录的任务吗?
A: 可以,在立即响应函数中不调用 `F6Module.login_in``cookies` 参数传 `None` 即可。
---
## 相关文件
- `app/module/f6_plugin_handlers.py` - 立即响应处理器
- `app/tasks/common.py` - 通用任务函数(update_jiandaoyun, approve_workflow
- `app/core/module_registry.py` - 模块注册表
- `main.py` - 应用入口和操作注册
---
**最后更新**: 2025年
**维护者**: 数据组