Files
2025-12-04 09:46:44 +08:00

213 lines
8.6 KiB
Python

"""
API 路由定义模块
本模块定义所有 API 路由端点,包括:
- /health: 健康检查端点
- /webhook: Webhook 端点,处理简道云插件的请求
所有路由都使用 FastAPI 的依赖注入系统,通过 dependencies.py 中的函数注入依赖项。
"""
from fastapi import APIRouter, Request, HTTPException, status, Depends
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from typing import Dict, Any
import json
import anyio
import asyncio
import logging
from app.schemas import WebhookRequest, WebhookResponse, HealthResponse
from app.api.dependencies import (
get_logger,
get_app_tools,
get_f6_plugin_module,
get_action_map
)
from app.utils.app_tools import AppTools
# 创建路由器
# 使用 APIRouter 分离路由,便于管理和维护
router = APIRouter()
@router.get("/health", response_model=HealthResponse, tags=["系统"])
async def healthcheck():
"""
健康检查端点
用于检查服务是否正常运行
"""
return HealthResponse(status="ok", version="2.0.0")
@router.post("/webhook", response_model=WebhookResponse, tags=["业务"])
async def webhook(
request: Request,
logger: logging.Logger = Depends(get_logger),
app_tools: AppTools = Depends(get_app_tools),
f6_plugin_module = Depends(get_f6_plugin_module),
action_map: Dict[str, Any] = Depends(get_action_map)
):
"""
接受前端请求后将任务放入消息队列
此端点接收简道云插件的请求,根据请求头中的 Action 字段路由到相应的处理函数。
支持的操作包括:登录、获取公司信息、文件校验、品牌创建等。
Args:
request: FastAPI 请求对象,包含请求体和请求头
logger: 日志记录器
app_tools: 应用工具实例
f6_plugin_module: F6插件模块实例
action_map: 操作映射表
Returns:
WebhookResponse: 任务处理结果
Raises:
HTTPException: 当操作类型无效或任务执行超时时抛出
"""
try:
# 获取请求数据并验证
try:
raw_data = await request.json()
# 使用 Pydantic 进行数据验证(允许额外字段)
webhook_data = WebhookRequest(**raw_data)
data = webhook_data.model_dump(exclude_none=True)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="请求体必须是有效的 JSON 格式"
)
except Exception as e:
logger.warning(f"请求数据验证失败: {str(e)}, 原始数据: {raw_data if 'raw_data' in locals() else 'N/A'}")
# 如果验证失败,仍然尝试使用原始数据(向后兼容)
data = raw_data if 'raw_data' in locals() else {}
# 获取并解码请求头
header = request.headers
decoded_header = app_tools.decode_headers(header)
# 验证 Action 字段(HTTP头在FastAPI中会被转换为小写)
# 同时检查 'Action' 和 'action' 以兼容不同情况
action = decoded_header.get('Action') or decoded_header.get('action')
if not action:
logger.warning(f"请求头中缺少 Action 字段,请求头: {decoded_header}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="请求头中缺少必需的 Action 字段"
)
# 处理 F6_Plugin 特殊逻辑
if action == 'F6_Plugin':
# 同时检查 'Check' 和 'check' 以兼容不同情况
check = decoded_header.get('Check') or decoded_header.get('check')
if check == '':
handler = f6_plugin_module.check_file
elif check == '':
sub_action = data.get('Action')
if not sub_action:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="F6_Plugin 操作需要提供 Action 字段"
)
handler = action_map.get(sub_action)
if not handler:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"未知的子操作类型: {sub_action}"
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"F6_Plugin 操作需要提供有效的 Check 字段(是/否),当前值: {check}"
)
else:
handler = action_map.get(action)
if not handler:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"未知的操作类型: {action}。支持的操作: {', '.join(action_map.keys())}"
)
logger.info(f"接收到操作请求: {action}, 数据ID: {data.get('data_id', 'N/A')}")
# 将任务放入消息队列
response_queue = app_tools.enqueue_task(handler, data)
# 等待任务处理结果(添加超时保护,简道云默认60秒)
try:
# 使用 asyncio.wait_for 添加超时
result = await asyncio.wait_for(
anyio.to_thread.run_sync(response_queue.get),
timeout=55.0
)
except asyncio.TimeoutError:
logger.error(f"任务执行超时: {action}, 数据ID: {data.get('data_id', 'N/A')}")
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail="任务执行超时,请稍后重试"
)
# 验证返回结果格式
if not isinstance(result, dict):
result = {"msg": str(result)}
# 处理 msg 字段:如果 msg 是字典,将其内容展开到结果中
if "msg" in result and isinstance(result["msg"], dict):
msg_dict = result.pop("msg")
logger.warning(f"操作 {action} 返回的 msg 字段是字典类型,正在自动转换。原始数据: {json.dumps(msg_dict, ensure_ascii=False)}")
# 如果字典中有 msg 字段,使用它;否则使用 JSON 字符串
if "msg" in msg_dict:
result["msg"] = msg_dict.pop("msg")
else:
result["msg"] = json.dumps(msg_dict, ensure_ascii=False)
# 将字典中的其他字段合并到结果中
result.update(msg_dict)
if "msg" not in result:
result["msg"] = "操作完成"
# 确保 msg 是字符串类型
if not isinstance(result.get("msg"), str):
logger.warning(f"操作 {action} 返回的 msg 字段类型为 {type(result.get('msg'))},正在转换为字符串")
result["msg"] = str(result.get("msg", "操作完成"))
logger.info(f"操作完成: {action}, 结果: {json.dumps(result, ensure_ascii=False)}")
# 返回响应(使用 Pydantic 模型验证)
try:
return WebhookResponse(**result)
except ValidationError as validation_error:
# 捕获 Pydantic 验证错误,提供更清晰的错误信息
error_messages = []
for error in validation_error.errors():
field = " -> ".join(str(loc) for loc in error.get("loc", []))
error_type = error.get("type", "unknown")
error_msg = error.get("msg", "验证失败")
error_messages.append(f"字段 '{field}': {error_msg} (类型: {error_type})")
error_detail = "; ".join(error_messages)
logger.error(
f"响应数据验证失败 - 操作: {action}, "
f"错误详情: {error_detail}, "
f"原始数据: {json.dumps(result, ensure_ascii=False, default=str)}"
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"响应数据格式错误: {error_detail}。请检查操作 '{action}' 的返回格式是否符合 API 规范(msg 字段必须是字符串类型)。"
)
except HTTPException:
# 重新抛出 HTTP 异常
raise
except Exception as e:
# 捕获其他未预期的异常
logger.error(f"处理请求时发生未预期的错误: {type(e).__name__} - {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"处理请求时发生错误: {str(e)}"
)