1277 lines
42 KiB
Python
1277 lines
42 KiB
Python
"""
|
||
Report Engine Flask接口。
|
||
|
||
该模块为前端/CLI提供统一HTTP/SSE入口,负责:
|
||
1. 初始化 ReportAgent 并串联后台线程;
|
||
2. 管理任务排队、进度查询、流式推送与日志下载;
|
||
3. 提供模板列表、输入文件检查等周边能力。
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import threading
|
||
import time
|
||
from collections import deque, defaultdict
|
||
from datetime import datetime
|
||
from queue import Queue, Empty
|
||
from flask import Blueprint, request, jsonify, Response, send_file, stream_with_context
|
||
from typing import Dict, Any, List, Optional
|
||
from loguru import logger
|
||
from .agent import ReportAgent, create_agent
|
||
from .nodes import ChapterJsonParseError
|
||
from .utils.config import settings
|
||
|
||
|
||
# 创建Blueprint
|
||
report_bp = Blueprint('report_engine', __name__)
|
||
|
||
# 全局变量
|
||
report_agent = None
|
||
current_task = None
|
||
task_lock = threading.Lock()
|
||
|
||
# ====== 流式推送与任务历史管理 ======
|
||
# 通过有界deque缓存最近的事件,方便SSE断线后快速补发
|
||
MAX_TASK_HISTORY = 5
|
||
STREAM_HEARTBEAT_INTERVAL = 15 # 心跳间隔秒
|
||
stream_lock = threading.Lock()
|
||
stream_subscribers = defaultdict(list)
|
||
tasks_registry: Dict[str, 'ReportTask'] = {}
|
||
LOG_STREAM_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
|
||
log_stream_handler_id: Optional[int] = None
|
||
|
||
|
||
def _stream_log_to_task(message):
|
||
"""
|
||
将loguru日志同步到当前任务的SSE事件,保证前端实时可见。
|
||
|
||
仅在存在运行中的任务时推送,避免无关日志刷屏。
|
||
"""
|
||
try:
|
||
record = message.record
|
||
level_name = record["level"].name
|
||
if level_name not in LOG_STREAM_LEVELS:
|
||
return
|
||
|
||
with task_lock:
|
||
task = current_task
|
||
|
||
if not task or task.status not in ("running", "pending"):
|
||
return
|
||
|
||
timestamp = record["time"].strftime("%H:%M:%S.%f")[:-3]
|
||
formatted_line = f"[{timestamp}] [{level_name}] {record['message']}"
|
||
task.publish_event(
|
||
"log",
|
||
{
|
||
"line": formatted_line,
|
||
"level": level_name.lower(),
|
||
"timestamp": timestamp,
|
||
"message": record["message"],
|
||
"module": record.get("module", ""),
|
||
"function": record.get("function", ""),
|
||
},
|
||
)
|
||
except Exception:
|
||
# 避免在日志钩子里产生日志递归
|
||
pass
|
||
|
||
|
||
def _setup_log_stream_forwarder():
|
||
"""为当前进程挂载一次性的loguru钩子,用于SSE实时转发。"""
|
||
global log_stream_handler_id
|
||
if log_stream_handler_id is not None:
|
||
return
|
||
log_stream_handler_id = logger.add(
|
||
_stream_log_to_task,
|
||
level="DEBUG",
|
||
enqueue=False,
|
||
catch=True,
|
||
)
|
||
|
||
|
||
def _register_stream(task_id: str) -> Queue:
|
||
"""
|
||
为指定任务注册一个事件队列,供SSE监听器消费。
|
||
|
||
返回的 Queue 会存入 `stream_subscribers`,SSE 生成器将不断读取。
|
||
|
||
参数:
|
||
task_id: 需要监听的任务ID。
|
||
|
||
返回:
|
||
Queue: 线程安全的事件队列。
|
||
"""
|
||
queue = Queue()
|
||
with stream_lock:
|
||
stream_subscribers[task_id].append(queue)
|
||
return queue
|
||
|
||
|
||
def _unregister_stream(task_id: str, queue: Queue):
|
||
"""
|
||
安全移除事件队列,避免内存泄漏。
|
||
|
||
需要在finally中调用,保证异常情况下资源也能释放。
|
||
|
||
参数:
|
||
task_id: 任务ID。
|
||
queue: 之前注册的事件队列。
|
||
"""
|
||
with stream_lock:
|
||
listeners = stream_subscribers.get(task_id, [])
|
||
if queue in listeners:
|
||
listeners.remove(queue)
|
||
if not listeners and task_id in stream_subscribers:
|
||
stream_subscribers.pop(task_id, None)
|
||
|
||
|
||
def _broadcast_event(task_id: str, event: Dict[str, Any]):
|
||
"""
|
||
将事件推送给所有监听者,失败时做好异常捕获。
|
||
|
||
采用浅拷贝监听列表,防止并发移除导致遍历异常。
|
||
|
||
参数:
|
||
task_id: 待推送的任务ID。
|
||
event: 结构化事件payload。
|
||
"""
|
||
with stream_lock:
|
||
listeners = list(stream_subscribers.get(task_id, []))
|
||
for queue in listeners:
|
||
try:
|
||
queue.put(event, timeout=0.1)
|
||
except Exception:
|
||
logger.exception("推送流式事件失败,跳过当前监听队列")
|
||
|
||
|
||
def _prune_task_history_locked():
|
||
"""
|
||
在task_lock持有期间调用,清理过多的历史任务。
|
||
|
||
仅保留最近 `MAX_TASK_HISTORY` 个任务,避免长时间运行占用过多内存。
|
||
|
||
说明:
|
||
该函数假设调用方已获取 `task_lock`,否则存在竞态风险。
|
||
"""
|
||
if len(tasks_registry) <= MAX_TASK_HISTORY:
|
||
return
|
||
# 按创建时间排序,移除最旧的任务
|
||
sorted_tasks = sorted(tasks_registry.values(), key=lambda t: t.created_at)
|
||
for task in sorted_tasks[:-MAX_TASK_HISTORY]:
|
||
tasks_registry.pop(task.task_id, None)
|
||
|
||
|
||
def _get_task(task_id: str) -> Optional['ReportTask']:
|
||
"""
|
||
统一的任务查找方法,优先返回当前任务。
|
||
|
||
避免重复写锁逻辑,便于多个API共享。
|
||
|
||
参数:
|
||
task_id: 任务ID。
|
||
|
||
返回:
|
||
ReportTask | None: 命中时返回任务实例,否则为None。
|
||
"""
|
||
with task_lock:
|
||
if current_task and current_task.task_id == task_id:
|
||
return current_task
|
||
return tasks_registry.get(task_id)
|
||
|
||
|
||
def _format_sse(event: Dict[str, Any]) -> str:
|
||
"""
|
||
按SSE协议格式化消息。
|
||
|
||
输出形如 `id:/event:/data:` 的三段文本,供浏览器端直接消费。
|
||
|
||
参数:
|
||
event: 事件payload,至少包含 id/type。
|
||
|
||
返回:
|
||
str: SSE协议要求的字符串。
|
||
"""
|
||
payload = json.dumps(event, ensure_ascii=False)
|
||
event_id = event.get('id', 0)
|
||
event_type = event.get('type', 'message')
|
||
return f"id: {event_id}\nevent: {event_type}\ndata: {payload}\n\n"
|
||
|
||
|
||
def initialize_report_engine():
|
||
"""
|
||
初始化Report Engine。
|
||
|
||
单例化 ReportAgent,方便 API 启动后直接接收任务。
|
||
|
||
返回:
|
||
bool: 初始化成功返回True,异常时返回False。
|
||
"""
|
||
global report_agent
|
||
try:
|
||
report_agent = create_agent()
|
||
logger.info("Report Engine初始化成功")
|
||
_setup_log_stream_forwarder()
|
||
|
||
# 检测 PDF 生成依赖(Pango)
|
||
try:
|
||
from .utils.dependency_check import log_dependency_status
|
||
log_dependency_status()
|
||
except Exception as dep_err:
|
||
logger.warning(f"依赖检测失败: {dep_err}")
|
||
|
||
return True
|
||
except Exception as e:
|
||
logger.exception(f"Report Engine初始化失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
class ReportTask:
|
||
"""
|
||
报告生成任务。
|
||
|
||
该对象串联运行状态、进度、事件历史及最终文件路径,
|
||
既供后台线程更新,也供HTTP接口读取。
|
||
"""
|
||
|
||
def __init__(self, query: str, task_id: str, custom_template: str = ""):
|
||
"""
|
||
初始化任务对象,记录查询词、自定义模板与运行期元数据。
|
||
|
||
Args:
|
||
query: 最终需要生成的报告主题
|
||
task_id: 任务唯一ID,通常由时间戳构造
|
||
custom_template: 可选的自定义Markdown模板
|
||
"""
|
||
self.task_id = task_id
|
||
self.query = query
|
||
self.custom_template = custom_template
|
||
self.status = "pending" # 四种状态(pending/running/completed/error)
|
||
self.progress = 0
|
||
self.result = None
|
||
self.error_message = ""
|
||
self.created_at = datetime.now()
|
||
self.updated_at = datetime.now()
|
||
self.html_content = ""
|
||
self.report_file_path = ""
|
||
self.report_file_relative_path = ""
|
||
self.report_file_name = ""
|
||
self.state_file_path = ""
|
||
self.state_file_relative_path = ""
|
||
self.ir_file_path = ""
|
||
self.ir_file_relative_path = ""
|
||
# ====== 流式事件缓存与并发保护 ======
|
||
# 使用deque保存最近的事件,结合锁保证多线程下的安全访问
|
||
self.event_history: deque = deque(maxlen=1000)
|
||
self._event_lock = threading.Lock()
|
||
self.last_event_id = 0
|
||
|
||
def update_status(self, status: str, progress: int = None, error_message: str = ""):
|
||
"""
|
||
更新任务状态并广播事件。
|
||
|
||
会自动刷新 `updated_at`、错误信息,并触发 `status` 类型的 SSE。
|
||
|
||
参数:
|
||
status: 任务阶段(pending/running/completed/error/cancelled)。
|
||
progress: 可选的进度百分比。
|
||
error_message: 出错时的人类可读说明。
|
||
"""
|
||
self.status = status
|
||
if progress is not None:
|
||
self.progress = progress
|
||
if error_message:
|
||
self.error_message = error_message
|
||
self.updated_at = datetime.now()
|
||
# 推送状态变更事件,方便前端实时刷新
|
||
self.publish_event(
|
||
'status',
|
||
{
|
||
'status': self.status,
|
||
'progress': self.progress,
|
||
'error_message': self.error_message,
|
||
'hint': error_message or '',
|
||
'task': self.to_dict(),
|
||
}
|
||
)
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典格式,方便直接返回给JSON API。"""
|
||
return {
|
||
'task_id': self.task_id,
|
||
'query': self.query,
|
||
'status': self.status,
|
||
'progress': self.progress,
|
||
'error_message': self.error_message,
|
||
'created_at': self.created_at.isoformat(),
|
||
'updated_at': self.updated_at.isoformat(),
|
||
'has_result': bool(self.html_content),
|
||
'report_file_ready': bool(self.report_file_path),
|
||
'report_file_name': self.report_file_name,
|
||
'report_file_path': self.report_file_relative_path or self.report_file_path,
|
||
'state_file_ready': bool(self.state_file_path),
|
||
'state_file_path': self.state_file_relative_path or self.state_file_path,
|
||
'ir_file_ready': bool(self.ir_file_path),
|
||
'ir_file_path': self.ir_file_relative_path or self.ir_file_path
|
||
}
|
||
|
||
def publish_event(self, event_type: str, payload: Dict[str, Any]) -> None:
|
||
"""
|
||
将任意事件放入缓存并广播,所有新增逻辑均配套中文说明。
|
||
|
||
参数:
|
||
event_type: SSE中的event名称。
|
||
payload: 实际业务数据。
|
||
"""
|
||
timestamp = datetime.utcnow().isoformat() + 'Z'
|
||
event: Dict[str, Any] = {
|
||
'id': 0,
|
||
'type': event_type,
|
||
'task_id': self.task_id,
|
||
'timestamp': timestamp,
|
||
'payload': payload,
|
||
}
|
||
with self._event_lock:
|
||
self.last_event_id += 1
|
||
event['id'] = self.last_event_id
|
||
self.event_history.append(event)
|
||
_broadcast_event(self.task_id, event)
|
||
|
||
def history_since(self, last_event_id: Optional[int]) -> List[Dict[str, Any]]:
|
||
"""
|
||
根据Last-Event-ID补发历史事件,确保断线重连无遗漏。
|
||
|
||
参数:
|
||
last_event_id: SSE客户端记录的最后一个事件ID。
|
||
|
||
返回:
|
||
list[dict]: 从 last_event_id 之后的事件列表。
|
||
"""
|
||
with self._event_lock:
|
||
if last_event_id is None:
|
||
return list(self.event_history)
|
||
return [evt for evt in self.event_history if evt['id'] > last_event_id]
|
||
|
||
|
||
def check_engines_ready() -> Dict[str, Any]:
|
||
"""
|
||
检查三个子引擎是否都有新文件。
|
||
|
||
调用 ReportAgent 的基准检测逻辑,并附带论坛日志存在性,
|
||
是 /status、/generate 的前置校验。
|
||
"""
|
||
directories = {
|
||
'insight': 'insight_engine_streamlit_reports',
|
||
'media': 'media_engine_streamlit_reports',
|
||
'query': 'query_engine_streamlit_reports'
|
||
}
|
||
|
||
forum_log_path = 'logs/forum.log'
|
||
|
||
if not report_agent:
|
||
return {
|
||
'ready': False,
|
||
'error': 'Report Engine未初始化'
|
||
}
|
||
|
||
return report_agent.check_input_files(
|
||
directories['insight'],
|
||
directories['media'],
|
||
directories['query'],
|
||
forum_log_path
|
||
)
|
||
|
||
|
||
def run_report_generation(task: ReportTask, query: str, custom_template: str = ""):
|
||
"""
|
||
在后台线程中运行报告生成。
|
||
|
||
包括:检查输入→加载文档→调用ReportAgent→持久化输出→
|
||
推送阶段性事件。出现错误会自动推送并写状态。
|
||
|
||
参数:
|
||
task: 本次任务对象,内部持有事件队列。
|
||
query: 报告主题。
|
||
custom_template: 可选的自定义模板字符串。
|
||
"""
|
||
global current_task
|
||
|
||
try:
|
||
# 在局部闭包内封装推送逻辑,便于传递给ReportAgent
|
||
def stream_handler(event_type: str, payload: Dict[str, Any]):
|
||
"""所有阶段事件都通过同一个接口分发,保证日志一致。"""
|
||
task.publish_event(event_type, payload)
|
||
# 如果事件包含进度信息,同步更新任务进度
|
||
if event_type == 'progress' and 'progress' in payload:
|
||
task.update_status("running", payload['progress'])
|
||
|
||
task.update_status("running", 5)
|
||
task.publish_event('stage', {'message': '任务已启动,正在检查输入文件', 'stage': 'prepare'})
|
||
|
||
# 检查输入文件
|
||
check_result = check_engines_ready()
|
||
if not check_result['ready']:
|
||
task.update_status("error", 0, f"输入文件未准备就绪: {check_result.get('missing_files', [])}")
|
||
return
|
||
|
||
task.publish_event('stage', {
|
||
'message': '输入文件检查通过,准备载入内容',
|
||
'stage': 'io_ready',
|
||
'files': check_result.get('latest_files', {})
|
||
})
|
||
|
||
# 加载输入文件
|
||
content = report_agent.load_input_files(check_result['latest_files'])
|
||
task.publish_event('stage', {'message': '源数据加载完成,启动生成流程', 'stage': 'data_loaded'})
|
||
|
||
# 生成报告(附带兜底重试,缓解瞬时网络抖动)
|
||
for attempt in range(1, 3):
|
||
try:
|
||
task.publish_event('stage', {
|
||
'message': f'正在调用ReportAgent生成报告(第{attempt}次尝试)',
|
||
'stage': 'agent_running',
|
||
'attempt': attempt
|
||
})
|
||
generation_result = report_agent.generate_report(
|
||
query=query,
|
||
reports=content['reports'],
|
||
forum_logs=content['forum_logs'],
|
||
custom_template=custom_template,
|
||
save_report=True,
|
||
stream_handler=stream_handler
|
||
)
|
||
break
|
||
except ChapterJsonParseError as err:
|
||
hint_message = "尝试将Report Engine的API更换为算力更强、上下文更长的LLM"
|
||
task.publish_event('warning', {
|
||
'message': hint_message,
|
||
'stage': 'agent_running',
|
||
'attempt': attempt,
|
||
'reason': 'chapter_json_parse',
|
||
'error': str(err),
|
||
'task': task.to_dict(),
|
||
})
|
||
# 旧逻辑:在JSON解析失败后重启Report Engine
|
||
# backoff = min(5 * attempt, 15)
|
||
# task.publish_event('stage', {
|
||
# 'message': f'{backoff} 秒后重试生成任务',
|
||
# 'stage': 'retry_wait',
|
||
# 'wait_seconds': backoff
|
||
# })
|
||
# time.sleep(backoff)
|
||
raise ChapterJsonParseError(hint_message) from err
|
||
except Exception as err:
|
||
# 将错误即时推送至前端,方便观察重试策略
|
||
task.publish_event('warning', {
|
||
'message': f'ReportAgent执行失败: {str(err)}',
|
||
'stage': 'agent_running',
|
||
'attempt': attempt
|
||
})
|
||
if attempt == 2:
|
||
raise
|
||
# 简单的指数退避,防止频繁触发限流(单位秒)
|
||
backoff = min(5 * attempt, 15)
|
||
task.publish_event('stage', {
|
||
'message': f'{backoff} 秒后重试生成任务',
|
||
'stage': 'retry_wait',
|
||
'wait_seconds': backoff
|
||
})
|
||
time.sleep(backoff)
|
||
|
||
if isinstance(generation_result, dict):
|
||
html_report = generation_result.get('html_content', '')
|
||
else:
|
||
html_report = generation_result
|
||
|
||
task.publish_event('stage', {'message': '报告生成完毕,准备持久化', 'stage': 'persist'})
|
||
|
||
# 保存结果
|
||
task.html_content = html_report
|
||
if isinstance(generation_result, dict):
|
||
task.report_file_path = generation_result.get('report_filepath', '')
|
||
task.report_file_relative_path = generation_result.get('report_relative_path', '')
|
||
task.report_file_name = generation_result.get('report_filename', '')
|
||
task.state_file_path = generation_result.get('state_filepath', '')
|
||
task.state_file_relative_path = generation_result.get('state_relative_path', '')
|
||
task.ir_file_path = generation_result.get('ir_filepath', '')
|
||
task.ir_file_relative_path = generation_result.get('ir_relative_path', '')
|
||
task.publish_event('html_ready', {
|
||
'message': 'HTML渲染完成,可刷新预览',
|
||
'report_file': task.report_file_relative_path or task.report_file_path,
|
||
'state_file': task.state_file_relative_path or task.state_file_path,
|
||
'task': task.to_dict(),
|
||
})
|
||
task.update_status("completed", 100)
|
||
task.publish_event('completed', {
|
||
'message': '任务完成',
|
||
'duration_seconds': (task.updated_at - task.created_at).total_seconds(),
|
||
'report_file': task.report_file_relative_path or task.report_file_path,
|
||
'task': task.to_dict(),
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.exception(f"报告生成过程中发生错误: {str(e)}")
|
||
task.update_status("error", 0, str(e))
|
||
task.publish_event('error', {
|
||
'message': str(e),
|
||
'stage': 'failed',
|
||
'task': task.to_dict(),
|
||
})
|
||
# 只在出错时清理任务
|
||
with task_lock:
|
||
if current_task and current_task.task_id == task.task_id:
|
||
current_task = None
|
||
|
||
|
||
@report_bp.route('/status', methods=['GET'])
|
||
def get_status():
|
||
"""
|
||
获取Report Engine状态,包括引擎就绪情况与当前任务信息。
|
||
|
||
返回:
|
||
Response: JSON结构包含initialized/engines_ready/当前任务等。
|
||
"""
|
||
try:
|
||
engines_status = check_engines_ready()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'initialized': report_agent is not None,
|
||
'engines_ready': engines_status['ready'],
|
||
'files_found': engines_status.get('files_found', []),
|
||
'missing_files': engines_status.get('missing_files', []),
|
||
'current_task': current_task.to_dict() if current_task else None
|
||
})
|
||
except Exception as e:
|
||
logger.exception(f"获取Report Engine状态失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/generate', methods=['POST'])
|
||
def generate_report():
|
||
"""
|
||
开始生成报告。
|
||
|
||
负责排队、创建后台线程、清空日志并返回SSE地址。
|
||
|
||
请求体:
|
||
query: 报告主题(可选)。
|
||
custom_template: 自定义模板字符串(可选)。
|
||
|
||
返回:
|
||
Response: JSON,包含 task_id 与 SSE stream url。
|
||
"""
|
||
global current_task
|
||
|
||
try:
|
||
# 检查是否有任务在运行
|
||
with task_lock:
|
||
if current_task and current_task.status == "running":
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '已有报告生成任务在运行中',
|
||
'current_task': current_task.to_dict()
|
||
}), 400
|
||
|
||
# 如果有已完成的任务,清理它
|
||
if current_task and current_task.status in ["completed", "error"]:
|
||
current_task = None
|
||
|
||
# 获取请求参数
|
||
data = request.get_json() or {}
|
||
query = data.get('query', '智能舆情分析报告')
|
||
custom_template = data.get('custom_template', '')
|
||
|
||
# 清空日志文件
|
||
clear_report_log()
|
||
|
||
# 检查Report Engine是否初始化
|
||
if not report_agent:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'Report Engine未初始化'
|
||
}), 500
|
||
|
||
# 检查输入文件是否准备就绪
|
||
engines_status = check_engines_ready()
|
||
if not engines_status['ready']:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '输入文件未准备就绪',
|
||
'missing_files': engines_status.get('missing_files', [])
|
||
}), 400
|
||
|
||
# 创建新任务
|
||
task_id = f"report_{int(time.time())}"
|
||
task = ReportTask(query, task_id, custom_template)
|
||
|
||
with task_lock:
|
||
current_task = task
|
||
tasks_registry[task_id] = task
|
||
_prune_task_history_locked()
|
||
|
||
# 通过主动推送pending事件告知前端任务已经排队
|
||
task.publish_event(
|
||
'status',
|
||
{
|
||
'status': task.status,
|
||
'progress': task.progress,
|
||
'message': '任务已排队,等待资源空闲',
|
||
'task': task.to_dict(),
|
||
}
|
||
)
|
||
|
||
# 在后台线程中运行报告生成
|
||
thread = threading.Thread(
|
||
target=run_report_generation,
|
||
args=(task, query, custom_template),
|
||
daemon=True
|
||
)
|
||
thread.start()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'task_id': task_id,
|
||
'message': '报告生成已启动',
|
||
'task': task.to_dict(),
|
||
'stream_url': f"/api/report/stream/{task_id}"
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.exception(f"开始生成报告失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/progress/<task_id>', methods=['GET'])
|
||
def get_progress(task_id: str):
|
||
"""
|
||
获取报告生成进度,若任务被清理则返回一个完成态兜底。
|
||
|
||
参数:
|
||
task_id: 任务唯一标识。
|
||
|
||
返回:
|
||
Response: JSON包含任务当前状态。
|
||
"""
|
||
try:
|
||
task = _get_task(task_id)
|
||
if not task:
|
||
# 如果任务不存在,可能是历史记录已被清理,回传一个完成态兜底
|
||
return jsonify({
|
||
'success': True,
|
||
'task': {
|
||
'task_id': task_id,
|
||
'status': 'completed',
|
||
'progress': 100,
|
||
'error_message': '',
|
||
'has_result': True,
|
||
'report_file_ready': False,
|
||
'report_file_name': '',
|
||
'report_file_path': '',
|
||
'state_file_ready': False,
|
||
'state_file_path': ''
|
||
}
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'task': task.to_dict()
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.exception(f"获取报告生成进度失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/stream/<task_id>', methods=['GET'])
|
||
def stream_task(task_id: str):
|
||
"""
|
||
基于SSE的实时推送接口。
|
||
|
||
- 自动补发Last-Event-ID之后的历史事件;
|
||
- 周期性发送心跳以防代理中断;
|
||
- 任务结束后自动注销监听。
|
||
|
||
参数:
|
||
task_id: 任务唯一标识。
|
||
|
||
返回:
|
||
Response: `text/event-stream` 类型响应。
|
||
"""
|
||
task = _get_task(task_id)
|
||
if not task:
|
||
return jsonify({'success': False, 'error': '任务不存在'}), 404
|
||
|
||
last_event_header = request.headers.get('Last-Event-ID')
|
||
try:
|
||
last_event_id = int(last_event_header) if last_event_header else None
|
||
except ValueError:
|
||
last_event_id = None
|
||
|
||
def event_generator():
|
||
"""
|
||
SSE事件生成器。
|
||
|
||
- 负责注册并消费对应任务的事件队列;
|
||
- 先回放历史事件再持续监听实时事件;
|
||
- 周期性发送心跳并在任务结束后自动注销监听。
|
||
"""
|
||
queue = _register_stream(task_id)
|
||
try:
|
||
# 断线重连场景下,先补发历史事件,保证界面状态一致
|
||
history = task.history_since(last_event_id)
|
||
for event in history:
|
||
yield _format_sse(event)
|
||
|
||
finished = task.status in ("completed", "error", "cancelled")
|
||
while True:
|
||
if finished:
|
||
break
|
||
try:
|
||
event = queue.get(timeout=STREAM_HEARTBEAT_INTERVAL)
|
||
yield _format_sse(event)
|
||
if event.get('type') in ("completed", "error"):
|
||
finished = True
|
||
except Empty:
|
||
heartbeat = {
|
||
'id': f"hb-{int(time.time() * 1000)}",
|
||
'type': 'heartbeat',
|
||
'task_id': task_id,
|
||
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
||
'payload': {'status': task.status}
|
||
}
|
||
yield _format_sse(heartbeat)
|
||
finished = task.status in ("completed", "error", "cancelled")
|
||
finally:
|
||
_unregister_stream(task_id, queue)
|
||
|
||
response = Response(
|
||
stream_with_context(event_generator()),
|
||
mimetype='text/event-stream'
|
||
)
|
||
response.headers['Cache-Control'] = 'no-cache'
|
||
response.headers['X-Accel-Buffering'] = 'no'
|
||
return response
|
||
|
||
|
||
@report_bp.route('/result/<task_id>', methods=['GET'])
|
||
def get_result(task_id: str):
|
||
"""
|
||
获取报告生成结果。
|
||
|
||
参数:
|
||
task_id: 任务ID。
|
||
|
||
返回:
|
||
Response: JSON,包含HTML预览与文件路径。
|
||
"""
|
||
try:
|
||
task = _get_task(task_id)
|
||
if not task:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '任务不存在'
|
||
}), 404
|
||
|
||
if task.status != "completed":
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '报告尚未完成',
|
||
'task': task.to_dict()
|
||
}), 400
|
||
|
||
return Response(
|
||
task.html_content,
|
||
mimetype='text/html'
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.exception(f"获取报告生成结果失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/result/<task_id>/json', methods=['GET'])
|
||
def get_result_json(task_id: str):
|
||
"""获取报告生成结果(JSON格式)"""
|
||
try:
|
||
task = _get_task(task_id)
|
||
if not task:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '任务不存在'
|
||
}), 404
|
||
|
||
if task.status != "completed":
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '报告尚未完成',
|
||
'task': task.to_dict()
|
||
}), 400
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'task': task.to_dict(),
|
||
'html_content': task.html_content
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.exception(f"获取报告生成结果失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/download/<task_id>', methods=['GET'])
|
||
def download_report(task_id: str):
|
||
"""
|
||
下载已生成的报告HTML文件。
|
||
|
||
参数:
|
||
task_id: 任务ID。
|
||
|
||
返回:
|
||
Response: HTML文件的附件下载响应。
|
||
"""
|
||
try:
|
||
task = _get_task(task_id)
|
||
if not task:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '任务不存在'
|
||
}), 404
|
||
|
||
if task.status != "completed" or not task.report_file_path:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '报告尚未完成或尚未保存'
|
||
}), 400
|
||
|
||
if not os.path.exists(task.report_file_path):
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '报告文件不存在或已被删除'
|
||
}), 404
|
||
|
||
download_name = task.report_file_name or os.path.basename(task.report_file_path)
|
||
return send_file(
|
||
task.report_file_path,
|
||
mimetype='text/html',
|
||
as_attachment=True,
|
||
download_name=download_name
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.exception(f"下载报告失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/cancel/<task_id>', methods=['POST'])
|
||
def cancel_task(task_id: str):
|
||
"""
|
||
取消报告生成任务。
|
||
|
||
参数:
|
||
task_id: 需要被取消的任务ID。
|
||
|
||
返回:
|
||
Response: JSON,包含取消结果或错误信息。
|
||
"""
|
||
global current_task
|
||
|
||
try:
|
||
with task_lock:
|
||
if current_task and current_task.task_id == task_id:
|
||
if current_task.status == "running":
|
||
current_task.update_status("cancelled", 0, "用户取消任务")
|
||
current_task.publish_event('cancelled', {
|
||
'message': '任务被用户主动终止',
|
||
'task': current_task.to_dict(),
|
||
})
|
||
current_task = None
|
||
task = tasks_registry.get(task_id)
|
||
if task and task.status == 'running':
|
||
task.update_status("cancelled", task.progress, "用户取消任务")
|
||
task.publish_event('cancelled', {
|
||
'message': '任务被用户主动终止',
|
||
'task': task.to_dict(),
|
||
})
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '任务已取消'
|
||
})
|
||
else:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '任务不存在或无法取消'
|
||
}), 404
|
||
|
||
except Exception as e:
|
||
logger.exception(f"取消报告生成任务失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/templates', methods=['GET'])
|
||
def get_templates():
|
||
"""
|
||
获取可用模板列表,便于前端展示可选Markdown骨架。
|
||
|
||
返回:
|
||
Response: JSON,列出模板名称/描述/大小。
|
||
"""
|
||
try:
|
||
if not report_agent:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'Report Engine未初始化'
|
||
}), 500
|
||
|
||
template_dir = settings.TEMPLATE_DIR
|
||
templates = []
|
||
|
||
if os.path.exists(template_dir):
|
||
for filename in os.listdir(template_dir):
|
||
if filename.endswith('.md'):
|
||
template_path = os.path.join(template_dir, filename)
|
||
try:
|
||
with open(template_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
templates.append({
|
||
'name': filename.replace('.md', ''),
|
||
'filename': filename,
|
||
'description': content.split('\n')[0] if content else '无描述',
|
||
'size': len(content)
|
||
})
|
||
except Exception as e:
|
||
logger.exception(f"读取模板失败 {filename}: {str(e)}")
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'templates': templates,
|
||
'template_dir': template_dir
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.exception(f"获取可用模板列表失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': str(e)
|
||
}), 500
|
||
|
||
|
||
# 错误处理
|
||
@report_bp.errorhandler(404)
|
||
def not_found(error):
|
||
"""404兜底处理:保证接口统一返回JSON结构"""
|
||
logger.exception(f"API端点不存在: {str(error)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'API端点不存在'
|
||
}), 404
|
||
|
||
|
||
@report_bp.errorhandler(500)
|
||
def internal_error(error):
|
||
"""500兜底处理:捕获未被主动捕获的异常"""
|
||
logger.exception(f"服务器内部错误: {str(error)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '服务器内部错误'
|
||
}), 500
|
||
|
||
|
||
def clear_report_log():
|
||
"""
|
||
清空report.log文件,方便新任务只查看本次运行日志。
|
||
|
||
返回:
|
||
None
|
||
"""
|
||
try:
|
||
log_file = settings.LOG_FILE
|
||
|
||
# 【修复】使用truncate而非重新打开,避免与logger的文件句柄冲突
|
||
# 追加模式打开,然后truncate,保持文件句柄有效
|
||
with open(log_file, 'r+', encoding='utf-8') as f:
|
||
f.truncate(0) # 清空文件内容但不关闭文件
|
||
f.flush() # 立即刷新
|
||
|
||
logger.info(f"已清空日志文件: {log_file}")
|
||
except FileNotFoundError:
|
||
# 文件不存在,创建空文件
|
||
try:
|
||
with open(log_file, 'w', encoding='utf-8') as f:
|
||
f.write('')
|
||
logger.info(f"创建日志文件: {log_file}")
|
||
except Exception as e:
|
||
logger.exception(f"创建日志文件失败: {str(e)}")
|
||
except Exception as e:
|
||
logger.exception(f"清空日志文件失败: {str(e)}")
|
||
|
||
|
||
@report_bp.route('/log', methods=['GET'])
|
||
def get_report_log():
|
||
"""
|
||
获取report.log内容,并按行去除空白返回。
|
||
|
||
【修复】优化大文件读取,添加错误处理和文件锁
|
||
|
||
返回:
|
||
Response: JSON,包含最新日志行数组。
|
||
"""
|
||
try:
|
||
log_file = settings.LOG_FILE
|
||
|
||
if not os.path.exists(log_file):
|
||
return jsonify({
|
||
'success': True,
|
||
'log_lines': []
|
||
})
|
||
|
||
# 【修复】检查文件大小,避免读取过大文件导致内存问题
|
||
file_size = os.path.getsize(log_file)
|
||
max_size = 10 * 1024 * 1024 # 10MB限制
|
||
|
||
if file_size > max_size:
|
||
# 文件过大,只读取最后10MB
|
||
with open(log_file, 'rb') as f:
|
||
f.seek(-max_size, 2) # 从文件末尾往前10MB
|
||
# 跳过可能不完整的第一行
|
||
f.readline()
|
||
content = f.read().decode('utf-8', errors='replace')
|
||
lines = content.splitlines()
|
||
logger.warning(f"日志文件过大 ({file_size} bytes),仅返回最后 {max_size} bytes")
|
||
else:
|
||
# 正常大小,完整读取
|
||
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
|
||
lines = f.readlines()
|
||
|
||
# 清理行尾的换行符和空行
|
||
log_lines = [line.rstrip('\n\r') for line in lines if line.strip()]
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'log_lines': log_lines
|
||
})
|
||
|
||
except PermissionError as e:
|
||
logger.error(f"读取日志权限不足: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '读取日志权限不足'
|
||
}), 403
|
||
except UnicodeDecodeError as e:
|
||
logger.error(f"日志文件编码错误: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '日志文件编码错误'
|
||
}), 500
|
||
except Exception as e:
|
||
logger.exception(f"读取日志失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': f'读取日志失败: {str(e)}'
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/log/clear', methods=['POST'])
|
||
def clear_log():
|
||
"""
|
||
手动清空日志,提供REST入口供前端一键重置。
|
||
|
||
返回:
|
||
Response: JSON,标记是否清理成功。
|
||
"""
|
||
try:
|
||
clear_report_log()
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '日志已清空'
|
||
})
|
||
except Exception as e:
|
||
logger.exception(f"清空日志失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': f'清空日志失败: {str(e)}'
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/export/pdf/<task_id>', methods=['GET'])
|
||
def export_pdf(task_id: str):
|
||
"""
|
||
导出报告为PDF格式。
|
||
|
||
从IR JSON文件生成优化的PDF,支持自动布局调整。
|
||
|
||
参数:
|
||
task_id: 任务ID
|
||
|
||
查询参数:
|
||
optimize: 是否启用布局优化(默认true)
|
||
|
||
返回:
|
||
Response: PDF文件流或错误信息
|
||
"""
|
||
try:
|
||
# 检测 Pango 依赖
|
||
from .utils.dependency_check import check_pango_available
|
||
pango_available, pango_message = check_pango_available()
|
||
if not pango_available:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'PDF 导出功能不可用:缺少系统依赖',
|
||
'details': '请查看根目录 README.md 第393行「PDF 导出依赖」部分了解如何安装依赖',
|
||
'help_url': 'https://github.com/666ghj/BettaFish#3-安装-pdf-导出所需系统依赖可选',
|
||
'system_message': pango_message
|
||
}), 503
|
||
|
||
# 获取任务信息
|
||
task = tasks_registry.get(task_id)
|
||
if not task:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '任务不存在'
|
||
}), 404
|
||
|
||
# 检查任务是否完成
|
||
if task.status != 'completed':
|
||
return jsonify({
|
||
'success': False,
|
||
'error': f'任务未完成,当前状态: {task.status}'
|
||
}), 400
|
||
|
||
# 获取IR文件路径
|
||
if not task.ir_file_path or not os.path.exists(task.ir_file_path):
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'IR文件不存在'
|
||
}), 404
|
||
|
||
# 读取IR数据
|
||
with open(task.ir_file_path, 'r', encoding='utf-8') as f:
|
||
document_ir = json.load(f)
|
||
|
||
# 检查是否启用布局优化
|
||
optimize = request.args.get('optimize', 'true').lower() == 'true'
|
||
|
||
# 创建PDF渲染器并生成PDF
|
||
from .renderers import PDFRenderer
|
||
renderer = PDFRenderer()
|
||
|
||
logger.info(f"开始导出PDF,任务ID: {task_id},布局优化: {optimize}")
|
||
|
||
# 生成PDF字节流
|
||
pdf_bytes = renderer.render_to_bytes(document_ir, optimize_layout=optimize)
|
||
|
||
# 确定下载文件名
|
||
topic = document_ir.get('metadata', {}).get('topic', 'report')
|
||
pdf_filename = f"report_{topic}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
|
||
|
||
# 返回PDF文件
|
||
return Response(
|
||
pdf_bytes,
|
||
mimetype='application/pdf',
|
||
headers={
|
||
'Content-Disposition': f'attachment; filename="{pdf_filename}"',
|
||
'Content-Type': 'application/pdf'
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.exception(f"导出PDF失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': f'导出PDF失败: {str(e)}'
|
||
}), 500
|
||
|
||
|
||
@report_bp.route('/export/pdf-from-ir', methods=['POST'])
|
||
def export_pdf_from_ir():
|
||
"""
|
||
从IR JSON直接导出PDF(不需要任务ID)。
|
||
|
||
适用于前端直接传递IR数据的场景。
|
||
|
||
请求体:
|
||
{
|
||
"document_ir": {...}, // Document IR JSON
|
||
"optimize": true // 是否启用布局优化(可选)
|
||
}
|
||
|
||
返回:
|
||
Response: PDF文件流或错误信息
|
||
"""
|
||
try:
|
||
# 检测 Pango 依赖
|
||
from .utils.dependency_check import check_pango_available
|
||
pango_available, pango_message = check_pango_available()
|
||
if not pango_available:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'PDF 导出功能不可用:缺少系统依赖',
|
||
'details': '请查看根目录 README.md 第393行「PDF 导出依赖」部分了解如何安装依赖',
|
||
'help_url': 'https://github.com/666ghj/BettaFish#3-安装-pdf-导出所需系统依赖可选',
|
||
'system_message': pango_message
|
||
}), 503
|
||
|
||
data = request.get_json()
|
||
|
||
if not data or 'document_ir' not in data:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': '缺少document_ir参数'
|
||
}), 400
|
||
|
||
document_ir = data['document_ir']
|
||
optimize = data.get('optimize', True)
|
||
|
||
# 创建PDF渲染器并生成PDF
|
||
from .renderers import PDFRenderer
|
||
renderer = PDFRenderer()
|
||
|
||
logger.info(f"从IR直接导出PDF,布局优化: {optimize}")
|
||
|
||
# 生成PDF字节流
|
||
pdf_bytes = renderer.render_to_bytes(document_ir, optimize_layout=optimize)
|
||
|
||
# 确定下载文件名
|
||
topic = document_ir.get('metadata', {}).get('topic', 'report')
|
||
pdf_filename = f"report_{topic}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
|
||
|
||
# 返回PDF文件
|
||
return Response(
|
||
pdf_bytes,
|
||
mimetype='application/pdf',
|
||
headers={
|
||
'Content-Disposition': f'attachment; filename="{pdf_filename}"',
|
||
'Content-Type': 'application/pdf'
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.exception(f"从IR导出PDF失败: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': f'导出PDF失败: {str(e)}'
|
||
}), 500
|