Add Comments

This commit is contained in:
马一丁
2025-11-14 19:44:04 +08:00
parent 52eed4d010
commit 6d0e8f4b8c
13 changed files with 655 additions and 61 deletions
+137 -11
View File
@@ -43,6 +43,12 @@ def _register_stream(task_id: str) -> Queue:
为指定任务注册一个事件队列,供SSE监听器消费。
返回的 Queue 会存入 `stream_subscribers`SSE 生成器将不断读取。
参数:
task_id: 需要监听的任务ID。
返回:
Queue: 线程安全的事件队列。
"""
queue = Queue()
with stream_lock:
@@ -55,6 +61,10 @@ def _unregister_stream(task_id: str, queue: Queue):
安全移除事件队列,避免内存泄漏。
需要在finally中调用,保证异常情况下资源也能释放。
参数:
task_id: 任务ID。
queue: 之前注册的事件队列。
"""
with stream_lock:
listeners = stream_subscribers.get(task_id, [])
@@ -69,6 +79,10 @@ def _broadcast_event(task_id: str, event: Dict[str, Any]):
将事件推送给所有监听者,失败时做好异常捕获。
采用浅拷贝监听列表,防止并发移除导致遍历异常。
参数:
task_id: 待推送的任务ID。
event: 结构化事件payload。
"""
with stream_lock:
listeners = list(stream_subscribers.get(task_id, []))
@@ -84,6 +98,9 @@ def _prune_task_history_locked():
在task_lock持有期间调用,清理过多的历史任务。
仅保留最近 `MAX_TASK_HISTORY` 个任务,避免长时间运行占用过多内存。
说明:
该函数假设调用方已获取 `task_lock`,否则存在竞态风险。
"""
if len(tasks_registry) <= MAX_TASK_HISTORY:
return
@@ -98,6 +115,12 @@ def _get_task(task_id: str) -> Optional['ReportTask']:
统一的任务查找方法,优先返回当前任务。
避免重复写锁逻辑,便于多个API共享。
参数:
task_id: 任务ID。
返回:
ReportTask | None: 命中时返回任务实例,否则为None。
"""
with task_lock:
if current_task and current_task.task_id == task_id:
@@ -110,6 +133,12 @@ def _format_sse(event: Dict[str, Any]) -> str:
按SSE协议格式化消息。
输出形如 `id:/event:/data:` 的三段文本,供浏览器端直接消费。
参数:
event: 事件payload,至少包含 id/type。
返回:
str: SSE协议要求的字符串。
"""
payload = json.dumps(event, ensure_ascii=False)
event_id = event.get('id', 0)
@@ -122,6 +151,9 @@ def initialize_report_engine():
初始化Report Engine。
单例化 ReportAgent,方便 API 启动后直接接收任务。
返回:
bool: 初始化成功返回True,异常时返回False。
"""
global report_agent
try:
@@ -176,6 +208,11 @@ class ReportTask:
更新任务状态并广播事件。
会自动刷新 `updated_at`、错误信息,并触发 `status` 类型的 SSE。
参数:
status: 任务阶段(pending/running/completed/error/cancelled)。
progress: 可选的进度百分比。
error_message: 出错时的人类可读说明。
"""
self.status = status
if progress is not None:
@@ -214,7 +251,13 @@ class ReportTask:
}
def publish_event(self, event_type: str, payload: Dict[str, Any]) -> None:
"""将任意事件放入缓存并广播,所有新增逻辑均配套中文说明。"""
"""
将任意事件放入缓存并广播,所有新增逻辑均配套中文说明。
参数:
event_type: SSE中的event名称。
payload: 实际业务数据。
"""
timestamp = datetime.utcnow().isoformat() + 'Z'
event: Dict[str, Any] = {
'id': 0,
@@ -230,7 +273,15 @@ class ReportTask:
_broadcast_event(self.task_id, event)
def history_since(self, last_event_id: Optional[int]) -> List[Dict[str, Any]]:
"""根据Last-Event-ID补发历史事件,确保断线重连无遗漏。"""
"""
根据Last-Event-ID补发历史事件,确保断线重连无遗漏。
参数:
last_event_id: SSE客户端记录的最后一个事件ID。
返回:
list[dict]: 从 last_event_id 之后的事件列表。
"""
with self._event_lock:
if last_event_id is None:
return list(self.event_history)
@@ -272,6 +323,11 @@ def run_report_generation(task: ReportTask, query: str, custom_template: str = "
包括:检查输入→加载文档→调用ReportAgent→持久化输出→
推送阶段性事件。出现错误会自动推送并写状态。
参数:
task: 本次任务对象,内部持有事件队列。
query: 报告主题。
custom_template: 可选的自定义模板字符串。
"""
global current_task
@@ -385,7 +441,12 @@ def run_report_generation(task: ReportTask, query: str, custom_template: str = "
@report_bp.route('/status', methods=['GET'])
def get_status():
"""获取Report Engine状态,包括引擎就绪情况与当前任务信息。"""
"""
获取Report Engine状态,包括引擎就绪情况与当前任务信息。
返回:
Response: JSON结构包含initialized/engines_ready/当前任务等。
"""
try:
engines_status = check_engines_ready()
@@ -411,6 +472,13 @@ def generate_report():
开始生成报告。
负责排队、创建后台线程、清空日志并返回SSE地址。
请求体:
query: 报告主题(可选)。
custom_template: 自定义模板字符串(可选)。
返回:
Response: JSON,包含 task_id 与 SSE stream url。
"""
global current_task
@@ -498,7 +566,15 @@ def generate_report():
@report_bp.route('/progress/<task_id>', methods=['GET'])
def get_progress(task_id: str):
"""获取报告生成进度,若任务被清理则返回一个完成态兜底。"""
"""
获取报告生成进度,若任务被清理则返回一个完成态兜底。
参数:
task_id: 任务唯一标识。
返回:
Response: JSON包含任务当前状态。
"""
try:
task = _get_task(task_id)
if not task:
@@ -540,6 +616,12 @@ def stream_task(task_id: str):
- 自动补发Last-Event-ID之后的历史事件;
- 周期性发送心跳以防代理中断;
- 任务结束后自动注销监听。
参数:
task_id: 任务唯一标识。
返回:
Response: `text/event-stream` 类型响应。
"""
task = _get_task(task_id)
if not task:
@@ -592,7 +674,15 @@ def stream_task(task_id: str):
@report_bp.route('/result/<task_id>', methods=['GET'])
def get_result(task_id: str):
"""获取报告生成结果"""
"""
获取报告生成结果。
参数:
task_id: 任务ID。
返回:
Response: JSON,包含HTML预览与文件路径。
"""
try:
task = _get_task(task_id)
if not task:
@@ -655,7 +745,15 @@ def get_result_json(task_id: str):
@report_bp.route('/download/<task_id>', methods=['GET'])
def download_report(task_id: str):
"""下载已生成的报告HTML文件"""
"""
下载已生成的报告HTML文件。
参数:
task_id: 任务ID。
返回:
Response: HTML文件的附件下载响应。
"""
try:
task = _get_task(task_id)
if not task:
@@ -694,7 +792,15 @@ def download_report(task_id: str):
@report_bp.route('/cancel/<task_id>', methods=['POST'])
def cancel_task(task_id: str):
"""取消报告生成任务"""
"""
取消报告生成任务。
参数:
task_id: 需要被取消的任务ID。
返回:
Response: JSON,包含取消结果或错误信息。
"""
global current_task
try:
@@ -735,7 +841,12 @@ def cancel_task(task_id: str):
@report_bp.route('/templates', methods=['GET'])
def get_templates():
"""获取可用模板列表,便于前端展示可选Markdown骨架。"""
"""
获取可用模板列表,便于前端展示可选Markdown骨架。
返回:
Response: JSON,列出模板名称/描述/大小。
"""
try:
if not report_agent:
return jsonify({
@@ -799,7 +910,12 @@ def internal_error(error):
def clear_report_log():
"""清空report.log文件,方便新任务只查看本次运行日志。"""
"""
清空report.log文件,方便新任务只查看本次运行日志。
返回:
None
"""
try:
log_file = settings.LOG_FILE
with open(log_file, 'w', encoding='utf-8') as f:
@@ -811,7 +927,12 @@ def clear_report_log():
@report_bp.route('/log', methods=['GET'])
def get_report_log():
"""获取report.log内容,并按行去除空白返回。"""
"""
获取report.log内容,并按行去除空白返回。
返回:
Response: JSON,包含最新日志行数组。
"""
try:
log_file = settings.LOG_FILE
@@ -842,7 +963,12 @@ def get_report_log():
@report_bp.route('/log/clear', methods=['POST'])
def clear_log():
"""手动清空日志,提供REST入口供前端一键重置。"""
"""
手动清空日志,提供REST入口供前端一键重置。
返回:
Response: JSON,标记是否清理成功。
"""
try:
clear_report_log()
return jsonify({