Add Comments

This commit is contained in:
马一丁
2025-11-13 22:49:59 +08:00
parent 82152547e1
commit e267b1fc04
23 changed files with 500 additions and 145 deletions
+83 -22
View File
@@ -1,6 +1,10 @@
"""
Report Engine Flask接口
提供HTTP API用于报告生成
Report Engine Flask接口
该模块为前端/CLI提供统一HTTP/SSE入口,负责:
1. 初始化 ReportAgent 并串联后台线程;
2. 管理任务排队、进度查询、流式推送与日志下载;
3. 提供模板列表、输入文件检查等周边能力。
"""
import os
@@ -35,7 +39,11 @@ tasks_registry: Dict[str, 'ReportTask'] = {}
def _register_stream(task_id: str) -> Queue:
"""为指定任务注册一个事件队列,供SSE监听器消费。"""
"""
为指定任务注册一个事件队列,供SSE监听器消费。
返回的 Queue 会存入 `stream_subscribers`SSE 生成器将不断读取。
"""
queue = Queue()
with stream_lock:
stream_subscribers[task_id].append(queue)
@@ -43,7 +51,11 @@ def _register_stream(task_id: str) -> Queue:
def _unregister_stream(task_id: str, queue: Queue):
"""安全移除事件队列,避免内存泄漏。"""
"""
安全移除事件队列,避免内存泄漏。
需要在finally中调用,保证异常情况下资源也能释放。
"""
with stream_lock:
listeners = stream_subscribers.get(task_id, [])
if queue in listeners:
@@ -53,7 +65,11 @@ def _unregister_stream(task_id: str, queue: Queue):
def _broadcast_event(task_id: str, event: Dict[str, Any]):
"""将事件推送给所有监听者,失败时做好异常捕获。"""
"""
将事件推送给所有监听者,失败时做好异常捕获。
采用浅拷贝监听列表,防止并发移除导致遍历异常。
"""
with stream_lock:
listeners = list(stream_subscribers.get(task_id, []))
for queue in listeners:
@@ -64,7 +80,11 @@ def _broadcast_event(task_id: str, event: Dict[str, Any]):
def _prune_task_history_locked():
"""在task_lock持有期间调用,清理过多的历史任务以控制内存。"""
"""
在task_lock持有期间调用,清理过多的历史任务。
仅保留最近 `MAX_TASK_HISTORY` 个任务,避免长时间运行占用过多内存。
"""
if len(tasks_registry) <= MAX_TASK_HISTORY:
return
# 按创建时间排序,移除最旧的任务
@@ -74,7 +94,11 @@ def _prune_task_history_locked():
def _get_task(task_id: str) -> Optional['ReportTask']:
"""统一的任务查找方法,优先返回当前任务。"""
"""
统一的任务查找方法,优先返回当前任务。
避免重复写锁逻辑,便于多个API共享。
"""
with task_lock:
if current_task and current_task.task_id == task_id:
return current_task
@@ -82,7 +106,11 @@ def _get_task(task_id: str) -> Optional['ReportTask']:
def _format_sse(event: Dict[str, Any]) -> str:
"""按SSE协议格式化消息。"""
"""
按SSE协议格式化消息。
输出形如 `id:/event:/data:` 的三段文本,供浏览器端直接消费。
"""
payload = json.dumps(event, ensure_ascii=False)
event_id = event.get('id', 0)
event_type = event.get('type', 'message')
@@ -90,7 +118,11 @@ def _format_sse(event: Dict[str, Any]) -> str:
def initialize_report_engine():
"""初始化Report Engine"""
"""
初始化Report Engine。
单例化 ReportAgent,方便 API 启动后直接接收任务。
"""
global report_agent
try:
report_agent = create_agent()
@@ -102,7 +134,12 @@ def initialize_report_engine():
class ReportTask:
"""报告生成任务"""
"""
报告生成任务。
该对象串联运行状态、进度、事件历史及最终文件路径,
既供后台线程更新,也供HTTP接口读取。
"""
def __init__(self, query: str, task_id: str, custom_template: str = ""):
"""
@@ -135,7 +172,11 @@ class ReportTask:
self.last_event_id = 0
def update_status(self, status: str, progress: int = None, error_message: str = ""):
"""更新任务状态"""
"""
更新任务状态并广播事件。
会自动刷新 `updated_at`、错误信息,并触发 `status` 类型的 SSE。
"""
self.status = status
if progress is not None:
self.progress = progress
@@ -155,7 +196,7 @@ class ReportTask:
)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
"""转换为字典格式,方便直接返回给JSON API。"""
return {
'task_id': self.task_id,
'query': self.query,
@@ -197,7 +238,12 @@ class ReportTask:
def check_engines_ready() -> Dict[str, Any]:
"""检查三个子引擎是否都有新文件"""
"""
检查三个子引擎是否都有新文件。
调用 ReportAgent 的基准检测逻辑,并附带论坛日志存在性,
是 /status、/generate 的前置校验。
"""
directories = {
'insight': 'insight_engine_streamlit_reports',
'media': 'media_engine_streamlit_reports',
@@ -221,7 +267,12 @@ def check_engines_ready() -> Dict[str, Any]:
def run_report_generation(task: ReportTask, query: str, custom_template: str = ""):
"""在后台线程中运行报告生成"""
"""
在后台线程中运行报告生成。
包括:检查输入→加载文档→调用ReportAgent→持久化输出→
推送阶段性事件。出现错误会自动推送并写状态。
"""
global current_task
try:
@@ -334,7 +385,7 @@ def run_report_generation(task: ReportTask, query: str, custom_template: str = "
@report_bp.route('/status', methods=['GET'])
def get_status():
"""获取Report Engine状态"""
"""获取Report Engine状态,包括引擎就绪情况与当前任务信息。"""
try:
engines_status = check_engines_ready()
@@ -356,7 +407,11 @@ def get_status():
@report_bp.route('/generate', methods=['POST'])
def generate_report():
"""开始生成报告"""
"""
开始生成报告。
负责排队、创建后台线程、清空日志并返回SSE地址。
"""
global current_task
try:
@@ -443,7 +498,7 @@ def generate_report():
@report_bp.route('/progress/<task_id>', methods=['GET'])
def get_progress(task_id: str):
"""获取报告生成进度"""
"""获取报告生成进度,若任务被清理则返回一个完成态兜底。"""
try:
task = _get_task(task_id)
if not task:
@@ -479,7 +534,13 @@ def get_progress(task_id: str):
@report_bp.route('/stream/<task_id>', methods=['GET'])
def stream_task(task_id: str):
"""基于SSE的实时推送接口,向前端持续广播阶段事件。"""
"""
基于SSE的实时推送接口。
- 自动补发Last-Event-ID之后的历史事件;
- 周期性发送心跳以防代理中断;
- 任务结束后自动注销监听。
"""
task = _get_task(task_id)
if not task:
return jsonify({'success': False, 'error': '任务不存在'}), 404
@@ -674,7 +735,7 @@ def cancel_task(task_id: str):
@report_bp.route('/templates', methods=['GET'])
def get_templates():
"""获取可用模板列表"""
"""获取可用模板列表,便于前端展示可选Markdown骨架。"""
try:
if not report_agent:
return jsonify({
@@ -738,7 +799,7 @@ def internal_error(error):
def clear_report_log():
"""清空report.log文件"""
"""清空report.log文件,方便新任务只查看本次运行日志。"""
try:
log_file = settings.LOG_FILE
with open(log_file, 'w', encoding='utf-8') as f:
@@ -750,7 +811,7 @@ def clear_report_log():
@report_bp.route('/log', methods=['GET'])
def get_report_log():
"""获取report.log内容"""
"""获取report.log内容,并按行去除空白返回。"""
try:
log_file = settings.LOG_FILE
@@ -781,7 +842,7 @@ def get_report_log():
@report_bp.route('/log/clear', methods=['POST'])
def clear_log():
"""手动清空日志"""
"""手动清空日志,提供REST入口供前端一键重置。"""
try:
clear_report_log()
return jsonify({