From b10dd449fa6859f1f38300170a2ccd3a9eabd750 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E4=B8=80=E4=B8=81?= <1769123563@qq.com> Date: Thu, 20 Nov 2025 22:36:49 +0800 Subject: [PATCH] Resolving Report Engine Logging Issues --- ReportEngine/flask_interface.py | 53 ++++++++- templates/index.html | 183 ++++++++++++++++---------------- 2 files changed, 146 insertions(+), 90 deletions(-) diff --git a/ReportEngine/flask_interface.py b/ReportEngine/flask_interface.py index 5b0a526..2fd30f2 100644 --- a/ReportEngine/flask_interface.py +++ b/ReportEngine/flask_interface.py @@ -37,6 +37,57 @@ STREAM_HEARTBEAT_INTERVAL = 15 # 心跳间隔秒 stream_lock = threading.Lock() stream_subscribers = defaultdict(list) tasks_registry: Dict[str, 'ReportTask'] = {} +LOG_STREAM_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} +log_stream_handler_id: Optional[int] = None + + +def _stream_log_to_task(message): + """ + 将loguru日志同步到当前任务的SSE事件,保证前端实时可见。 + + 仅在存在运行中的任务时推送,避免无关日志刷屏。 + """ + try: + record = message.record + level_name = record["level"].name + if level_name not in LOG_STREAM_LEVELS: + return + + with task_lock: + task = current_task + + if not task or task.status not in ("running", "pending"): + return + + timestamp = record["time"].strftime("%H:%M:%S.%f")[:-3] + formatted_line = f"[{timestamp}] [{level_name}] {record['message']}" + task.publish_event( + "log", + { + "line": formatted_line, + "level": level_name.lower(), + "timestamp": timestamp, + "message": record["message"], + "module": record.get("module", ""), + "function": record.get("function", ""), + }, + ) + except Exception: + # 避免在日志钩子里产生日志递归 + pass + + +def _setup_log_stream_forwarder(): + """为当前进程挂载一次性的loguru钩子,用于SSE实时转发。""" + global log_stream_handler_id + if log_stream_handler_id is not None: + return + log_stream_handler_id = logger.add( + _stream_log_to_task, + level="DEBUG", + enqueue=False, + catch=True, + ) def _register_stream(task_id: str) -> Queue: @@ -160,6 +211,7 @@ def initialize_report_engine(): try: report_agent = create_agent() logger.info("Report Engine初始化成功") + _setup_log_stream_forwarder() # 检测 PDF 生成依赖(Pango) try: @@ -1222,4 +1274,3 @@ def export_pdf_from_ir(): 'success': False, 'error': f'导出PDF失败: {str(e)}' }), 500 - diff --git a/templates/index.html b/templates/index.html index 4937131..2544cfb 100644 --- a/templates/index.html +++ b/templates/index.html @@ -4251,6 +4251,8 @@ let reportLogLineCount = 0; let reportLockCheckInterval = null; let lastCompletedReportTask = null; + // 标记是否已通过SSE直接获取日志,避免轮询重复 + let reportLogStreaming = false; // ====== Report Engine 日志管理器 ====== class ReportLogManager { @@ -4258,46 +4260,44 @@ this.intervalId = null; this.lineCount = 0; this.isRunning = false; - this.refreshInterval = 250; // 250ms轮询一次,更加实时 + this.refreshInterval = 250; // 改为250ms,更接近实时 this.lastError = null; this.retryCount = 0; this.maxRetries = 3; + this.consecutiveErrors = 0; // 连续错误计数 + this.maxConsecutiveErrors = 10; // 增加到10次,因为频率更高了 + this.abortController = null; // 复用controller避免创建开销 + this.isFetching = false; // 避免并发请求 } // 启动日志轮询 start() { - if (this.isRunning) { - console.log('[ReportLogManager] 已在运行,跳过重复启动'); + if (this.isRunning || reportLogStreaming) { return; } - console.log('[ReportLogManager] ===== 启动日志轮询系统 ====='); this.isRunning = true; this.retryCount = 0; + this.consecutiveErrors = 0; // 重置连续错误计数 // 立即执行一次 - console.log('[ReportLogManager] 执行初始刷新...'); + console.log('[ReportLogManager] 启动日志轮询'); this.refresh(); // 启动定时轮询 this.intervalId = setInterval(() => { - if (currentApp === 'report' && this.isRunning) { - console.log('[ReportLogManager] 定时器触发,执行刷新...'); + if (currentApp === 'report' && this.isRunning && !reportLogStreaming) { this.refresh(); } }, this.refreshInterval); - - console.log(`[ReportLogManager] 轮询已启动,频率: ${this.refreshInterval}ms, intervalId: ${this.intervalId}`); } // 停止日志轮询 stop() { if (!this.isRunning) { - console.log('[ReportLogManager] 未在运行,无需停止'); return; } - console.log('[ReportLogManager] ===== 停止日志轮询系统 ====='); this.isRunning = false; if (this.intervalId) { @@ -4305,38 +4305,55 @@ this.intervalId = null; } - console.log('[ReportLogManager] 轮询已停止'); + // 取消正在进行的请求 + if (this.abortController) { + this.abortController.abort(); + this.abortController = null; + } + this.isFetching = false; + + console.log('[ReportLogManager] 停止日志轮询'); } // 重置计数器(任务开始时调用) reset() { - console.log(`[ReportLogManager] 重置计数器,原值: ${this.lineCount}`); this.lineCount = 0; this.lastError = null; this.retryCount = 0; + this.consecutiveErrors = 0; + this.isFetching = false; } // 刷新日志 refresh() { - if (!this.isRunning) { - console.log('[ReportLogManager.refresh] 管理器未运行,跳过刷新'); + if (!this.isRunning || reportLogStreaming) { return; } - console.log('[ReportLogManager.refresh] 开始刷新日志...'); + if (this.isFetching) { + return; + } + this.isFetching = true; - // 【修复】使用传统的Promise方式,避免async/await兼容性问题 - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 3000); + // 复用或创建新的 AbortController + if (this.abortController) { + this.abortController.abort(); + } + this.abortController = new AbortController(); + + const timeoutId = setTimeout(() => { + if (this.abortController) { + this.abortController.abort(); + } + }, 3000); fetch('/api/report/log', { method: 'GET', headers: { 'Cache-Control': 'no-cache' }, - signal: controller.signal + signal: this.abortController.signal }) .then(response => { clearTimeout(timeoutId); - console.log(`[ReportLogManager.refresh] 收到响应,状态: ${response.status}`); if (!response.ok) { throw new Error(`HTTP ${response.status}`); @@ -4345,29 +4362,28 @@ return response.json(); }) .then(data => { - console.log('[ReportLogManager.refresh] 解析JSON成功'); + // 成功后重置连续错误计数 + this.consecutiveErrors = 0; + this.retryCount = 0; if (!data.success) { throw new Error(data.error || '未知错误'); } - // 成功后重置重试计数 - this.retryCount = 0; - - // 【调试】输出获取到的日志行数 - if (data.log_lines) { - console.log(`[ReportLogManager.refresh] API返回 ${data.log_lines.length} 行日志`); - } else { - console.log('[ReportLogManager.refresh] API返回的log_lines为空'); - } - // 处理日志数据 this.processLogs(data.log_lines || []); }) .catch(error => { clearTimeout(timeoutId); - console.error('[ReportLogManager.refresh] 错误:', error); + // 忽略abort错误 + if (error.name === 'AbortError') { + this.isFetching = false; + return; + } this.handleError(error); + }) + .finally(() => { + this.isFetching = false; }); } @@ -4375,48 +4391,33 @@ processLogs(logLines) { const totalLines = logLines.length; - console.log(`[ReportLogManager.processLogs] 总行数: ${totalLines}, 当前计数: ${this.lineCount}`); - // 如果有新日志 if (totalLines > this.lineCount) { const newLines = logLines.slice(this.lineCount); - console.log(`[ReportLogManager] 发现 ${newLines.length} 条新日志 (${this.lineCount} -> ${totalLines})`); - - // 输出前3行新日志用于调试 - if (newLines.length > 0) { - console.log('[ReportLogManager] 新日志样本:'); - newLines.slice(0, 3).forEach((line, idx) => { - console.log(` [${idx}] ${line.substring(0, 100)}...`); - }); - } // 逐行处理并显示 - newLines.forEach((line, index) => { - console.log(`[ReportLogManager.processLogs] 处理第 ${index + 1}/${newLines.length} 行`); + newLines.forEach(line => { this.displayLogLine(line); }); // 更新计数器 this.lineCount = totalLines; - console.log(`[ReportLogManager] 计数器更新为: ${this.lineCount}`); - } else { - console.log(`[ReportLogManager] 没有新日志 (总数: ${totalLines}, 已读: ${this.lineCount})`); } } // 显示单行日志(带格式化) displayLogLine(line) { // 解析loguru格式的日志 + // 注意:loguru的级别字段会填充到8个字符,如 "INFO ", "WARNING ", "DEBUG ", "ERROR " + // 修改正则以匹配带空格填充的级别字段 const logPattern = /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s*\|\s*(INFO|DEBUG|WARNING|ERROR|CRITICAL)\s*\|\s*(.+?)\s*-\s*(.*)$/; const match = line.match(logPattern); if (match) { - const [, timestamp, level, location, message] = match; + const [, timestamp, levelWithPadding, location, message] = match; - // 【调试】输出匹配到的日志级别 - if (level === 'WARNING' || level === 'ERROR' || level === 'DEBUG') { - console.log(`[ReportLogManager] 检测到 ${level} 日志: ${message.substring(0, 50)}...`); - } + // 去除级别中的填充空格 + const level = levelWithPadding.trim(); // 格式化输出 - 简化时间戳,只显示时间部分 const timeOnly = timestamp.split(' ')[1]; @@ -4432,53 +4433,40 @@ } else { appendConsoleTextLine('report', formattedLine); } - - // 同时在浏览器控制台输出(便于调试)- 降低输出频率 - if (level === 'ERROR' || level === 'CRITICAL') { - console.error(`[ReportLog] ${formattedLine}`); - } else if (level === 'WARNING') { - console.warn(`[ReportLog] ${formattedLine}`); - } else if (level === 'DEBUG') { - console.debug(`[ReportLog] ${formattedLine}`); - } } else { // 非标准格式的日志,直接显示 - // 【调试】输出未匹配的行,帮助调试正则 - if (line.includes('WARNING') || line.includes('ERROR') || line.includes('DEBUG')) { - console.log(`[ReportLogManager] 未匹配的日志行: "${line}"`); - } appendConsoleTextLine('report', line); } } // 处理错误 handleError(error) { + // 增加连续错误计数 + this.consecutiveErrors++; + // 避免重复错误日志 const errorMsg = error.message || error.toString(); - if (errorMsg === this.lastError) { - return; // 相同错误不重复输出 - } - + const isSameError = (errorMsg === this.lastError); this.lastError = errorMsg; - this.retryCount++; - // 只在前几次重试时输出错误 - if (this.retryCount <= this.maxRetries) { - console.warn(`[ReportLogManager] 获取日志失败 (${this.retryCount}/${this.maxRetries}): ${errorMsg}`); + // 只在前几次或新错误时输出 + if (!isSameError && this.consecutiveErrors <= 3) { + console.warn(`[ReportLogManager] 获取日志失败 (连续${this.consecutiveErrors}次): ${errorMsg}`); } - // 超过最大重试次数时暂停一段时间 - if (this.retryCount > this.maxRetries) { + // 连续错误过多时暂停 + if (this.consecutiveErrors >= this.maxConsecutiveErrors) { this.stop(); - console.error('[ReportLogManager] 多次失败,暂停轮询'); + console.error('[ReportLogManager] 连续错误过多,暂停轮询'); - // 5秒后自动重试 + // 10秒后自动重试 setTimeout(() => { - if (currentApp === 'report') { + if (currentApp === 'report' && !this.isRunning) { console.log('[ReportLogManager] 尝试恢复轮询...'); + this.consecutiveErrors = 0; // 重置错误计数 this.start(); } - }, 5000); + }, 10000); } } @@ -5401,7 +5389,6 @@ // 【优化】启动日志轮询 // 确保从任务开始就能读取日志 reportLogManager.start(); - console.log('[ReportLogManager] 任务创建成功,计数器已重置,启动日志轮询'); if (window.EventSource) { openReportStream(reportTaskId); @@ -5667,22 +5654,25 @@ reportStreamRetryDelay = 3000; updateReportStreamStatus('connected'); appendReportStreamLine(isRetry ? 'SSE重连成功' : 'Report Engine流式连接已建立', 'success', { badge: 'SSE' }); + reportLogStreaming = true; + // SSE已经推送日志,关闭轮询避免重复 + reportLogManager.stop(); + reportLogManager.reset(); startStreamHeartbeat(); - - // 【修复】启动日志轮询,读取logger.info/debug/warning/error - // SSE只推送显式事件(stage/chapter_status等),logger日志需要轮询读取 - reportLogManager.start(); - console.log('[ReportLogManager] SSE连接建立,同步启动日志轮询'); }; reportEventSource.onerror = () => { appendReportStreamLine('检测到网络抖动,SSE正在等待自动重连...', 'warn', { badge: 'SSE' }); updateReportStreamStatus('reconnecting'); clearStreamHeartbeat(); safeCloseReportStream(true, true); + // SSE断开期间恢复轮询,避免日志缺口 + if (reportTaskId) { + reportLogManager.start(); + } scheduleReportStreamReconnect(taskId); }; - const events = ['status', 'stage', 'chapter_status', 'chapter_chunk', 'warning', 'error', 'debug', 'html_ready', 'completed', 'heartbeat']; + const events = ['status', 'stage', 'chapter_status', 'chapter_chunk', 'warning', 'error', 'debug', 'html_ready', 'completed', 'heartbeat', 'log']; events.forEach(evt => { reportEventSource.addEventListener(evt, (event) => dispatchReportStreamEvent(evt, event)); }); @@ -5695,13 +5685,13 @@ reportEventSource.close(); reportEventSource = null; } + reportLogStreaming = false; if (reportStreamReconnectTimer) { clearTimeout(reportStreamReconnectTimer); reportStreamReconnectTimer = null; } // 清除日志刷新(使用新的日志管理器) reportLogManager.stop(); - console.log('[ReportLogManager] SSE连接关闭,停止日志轮询'); clearStreamHeartbeat(); if (!keepIndicator) { @@ -5796,6 +5786,21 @@ case 'debug': appendReportStreamLine(payload.message || 'Debug信息', 'info', { badge: 'DEBUG' }); break; + case 'log': { + if (payload.line) { + const level = (payload.level || '').toLowerCase(); + let levelClass = ''; + if (level === 'error' || level === 'critical') { + levelClass = 'error'; + } else if (level === 'warning') { + levelClass = 'warning'; + } else if (level === 'debug') { + levelClass = 'debug'; + } + appendConsoleTextLine('report', payload.line, `console-line report-stream-line ${levelClass}`.trim()); + } + break; + } case 'html_ready': appendReportStreamLine('HTML渲染完成,正在刷新预览...', 'success'); if (task) {