Resolving Report Engine Logging Issues

This commit is contained in:
马一丁
2025-11-20 22:36:49 +08:00
parent 2fb15f5efc
commit b10dd449fa
2 changed files with 146 additions and 90 deletions
+52 -1
View File
@@ -37,6 +37,57 @@ STREAM_HEARTBEAT_INTERVAL = 15 # 心跳间隔秒
stream_lock = threading.Lock()
stream_subscribers = defaultdict(list)
tasks_registry: Dict[str, 'ReportTask'] = {}
LOG_STREAM_LEVELS = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
log_stream_handler_id: Optional[int] = None
def _stream_log_to_task(message):
"""
将loguru日志同步到当前任务的SSE事件,保证前端实时可见。
仅在存在运行中的任务时推送,避免无关日志刷屏。
"""
try:
record = message.record
level_name = record["level"].name
if level_name not in LOG_STREAM_LEVELS:
return
with task_lock:
task = current_task
if not task or task.status not in ("running", "pending"):
return
timestamp = record["time"].strftime("%H:%M:%S.%f")[:-3]
formatted_line = f"[{timestamp}] [{level_name}] {record['message']}"
task.publish_event(
"log",
{
"line": formatted_line,
"level": level_name.lower(),
"timestamp": timestamp,
"message": record["message"],
"module": record.get("module", ""),
"function": record.get("function", ""),
},
)
except Exception:
# 避免在日志钩子里产生日志递归
pass
def _setup_log_stream_forwarder():
"""为当前进程挂载一次性的loguru钩子,用于SSE实时转发。"""
global log_stream_handler_id
if log_stream_handler_id is not None:
return
log_stream_handler_id = logger.add(
_stream_log_to_task,
level="DEBUG",
enqueue=False,
catch=True,
)
def _register_stream(task_id: str) -> Queue:
@@ -160,6 +211,7 @@ def initialize_report_engine():
try:
report_agent = create_agent()
logger.info("Report Engine初始化成功")
_setup_log_stream_forwarder()
# 检测 PDF 生成依赖(Pango
try:
@@ -1222,4 +1274,3 @@ def export_pdf_from_ir():
'success': False,
'error': f'导出PDF失败: {str(e)}'
}), 500
+94 -89
View File
@@ -4251,6 +4251,8 @@
let reportLogLineCount = 0;
let reportLockCheckInterval = null;
let lastCompletedReportTask = null;
// 标记是否已通过SSE直接获取日志,避免轮询重复
let reportLogStreaming = false;
// ====== Report Engine 日志管理器 ======
class ReportLogManager {
@@ -4258,46 +4260,44 @@
this.intervalId = null;
this.lineCount = 0;
this.isRunning = false;
this.refreshInterval = 250; // 250ms轮询一次,更加实时
this.refreshInterval = 250; // 改为250ms,更接近实时
this.lastError = null;
this.retryCount = 0;
this.maxRetries = 3;
this.consecutiveErrors = 0; // 连续错误计数
this.maxConsecutiveErrors = 10; // 增加到10次,因为频率更高了
this.abortController = null; // 复用controller避免创建开销
this.isFetching = false; // 避免并发请求
}
// 启动日志轮询
start() {
if (this.isRunning) {
console.log('[ReportLogManager] 已在运行,跳过重复启动');
if (this.isRunning || reportLogStreaming) {
return;
}
console.log('[ReportLogManager] ===== 启动日志轮询系统 =====');
this.isRunning = true;
this.retryCount = 0;
this.consecutiveErrors = 0; // 重置连续错误计数
// 立即执行一次
console.log('[ReportLogManager] 执行初始刷新...');
console.log('[ReportLogManager] 启动日志轮询');
this.refresh();
// 启动定时轮询
this.intervalId = setInterval(() => {
if (currentApp === 'report' && this.isRunning) {
console.log('[ReportLogManager] 定时器触发,执行刷新...');
if (currentApp === 'report' && this.isRunning && !reportLogStreaming) {
this.refresh();
}
}, this.refreshInterval);
console.log(`[ReportLogManager] 轮询已启动,频率: ${this.refreshInterval}ms, intervalId: ${this.intervalId}`);
}
// 停止日志轮询
stop() {
if (!this.isRunning) {
console.log('[ReportLogManager] 未在运行,无需停止');
return;
}
console.log('[ReportLogManager] ===== 停止日志轮询系统 =====');
this.isRunning = false;
if (this.intervalId) {
@@ -4305,38 +4305,55 @@
this.intervalId = null;
}
console.log('[ReportLogManager] 轮询已停止');
// 取消正在进行的请求
if (this.abortController) {
this.abortController.abort();
this.abortController = null;
}
this.isFetching = false;
console.log('[ReportLogManager] 停止日志轮询');
}
// 重置计数器(任务开始时调用)
reset() {
console.log(`[ReportLogManager] 重置计数器,原值: ${this.lineCount}`);
this.lineCount = 0;
this.lastError = null;
this.retryCount = 0;
this.consecutiveErrors = 0;
this.isFetching = false;
}
// 刷新日志
refresh() {
if (!this.isRunning) {
console.log('[ReportLogManager.refresh] 管理器未运行,跳过刷新');
if (!this.isRunning || reportLogStreaming) {
return;
}
console.log('[ReportLogManager.refresh] 开始刷新日志...');
if (this.isFetching) {
return;
}
this.isFetching = true;
// 【修复】使用传统的Promise方式,避免async/await兼容性问题
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 3000);
// 复用或创建新的 AbortController
if (this.abortController) {
this.abortController.abort();
}
this.abortController = new AbortController();
const timeoutId = setTimeout(() => {
if (this.abortController) {
this.abortController.abort();
}
}, 3000);
fetch('/api/report/log', {
method: 'GET',
headers: { 'Cache-Control': 'no-cache' },
signal: controller.signal
signal: this.abortController.signal
})
.then(response => {
clearTimeout(timeoutId);
console.log(`[ReportLogManager.refresh] 收到响应,状态: ${response.status}`);
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
@@ -4345,29 +4362,28 @@
return response.json();
})
.then(data => {
console.log('[ReportLogManager.refresh] 解析JSON成功');
// 成功后重置连续错误计数
this.consecutiveErrors = 0;
this.retryCount = 0;
if (!data.success) {
throw new Error(data.error || '未知错误');
}
// 成功后重置重试计数
this.retryCount = 0;
// 【调试】输出获取到的日志行数
if (data.log_lines) {
console.log(`[ReportLogManager.refresh] API返回 ${data.log_lines.length} 行日志`);
} else {
console.log('[ReportLogManager.refresh] API返回的log_lines为空');
}
// 处理日志数据
this.processLogs(data.log_lines || []);
})
.catch(error => {
clearTimeout(timeoutId);
console.error('[ReportLogManager.refresh] 错误:', error);
// 忽略abort错误
if (error.name === 'AbortError') {
this.isFetching = false;
return;
}
this.handleError(error);
})
.finally(() => {
this.isFetching = false;
});
}
@@ -4375,48 +4391,33 @@
processLogs(logLines) {
const totalLines = logLines.length;
console.log(`[ReportLogManager.processLogs] 总行数: ${totalLines}, 当前计数: ${this.lineCount}`);
// 如果有新日志
if (totalLines > this.lineCount) {
const newLines = logLines.slice(this.lineCount);
console.log(`[ReportLogManager] 发现 ${newLines.length} 条新日志 (${this.lineCount} -> ${totalLines})`);
// 输出前3行新日志用于调试
if (newLines.length > 0) {
console.log('[ReportLogManager] 新日志样本:');
newLines.slice(0, 3).forEach((line, idx) => {
console.log(` [${idx}] ${line.substring(0, 100)}...`);
});
}
// 逐行处理并显示
newLines.forEach((line, index) => {
console.log(`[ReportLogManager.processLogs] 处理第 ${index + 1}/${newLines.length} 行`);
newLines.forEach(line => {
this.displayLogLine(line);
});
// 更新计数器
this.lineCount = totalLines;
console.log(`[ReportLogManager] 计数器更新为: ${this.lineCount}`);
} else {
console.log(`[ReportLogManager] 没有新日志 (总数: ${totalLines}, 已读: ${this.lineCount})`);
}
}
// 显示单行日志(带格式化)
displayLogLine(line) {
// 解析loguru格式的日志
// 注意:loguru的级别字段会填充到8个字符,如 "INFO ", "WARNING ", "DEBUG ", "ERROR "
// 修改正则以匹配带空格填充的级别字段
const logPattern = /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s*\|\s*(INFO|DEBUG|WARNING|ERROR|CRITICAL)\s*\|\s*(.+?)\s*-\s*(.*)$/;
const match = line.match(logPattern);
if (match) {
const [, timestamp, level, location, message] = match;
const [, timestamp, levelWithPadding, location, message] = match;
// 【调试】输出匹配到的日志级别
if (level === 'WARNING' || level === 'ERROR' || level === 'DEBUG') {
console.log(`[ReportLogManager] 检测到 ${level} 日志: ${message.substring(0, 50)}...`);
}
// 去除级别中的填充空格
const level = levelWithPadding.trim();
// 格式化输出 - 简化时间戳,只显示时间部分
const timeOnly = timestamp.split(' ')[1];
@@ -4432,53 +4433,40 @@
} else {
appendConsoleTextLine('report', formattedLine);
}
// 同时在浏览器控制台输出(便于调试)- 降低输出频率
if (level === 'ERROR' || level === 'CRITICAL') {
console.error(`[ReportLog] ${formattedLine}`);
} else if (level === 'WARNING') {
console.warn(`[ReportLog] ${formattedLine}`);
} else if (level === 'DEBUG') {
console.debug(`[ReportLog] ${formattedLine}`);
}
} else {
// 非标准格式的日志,直接显示
// 【调试】输出未匹配的行,帮助调试正则
if (line.includes('WARNING') || line.includes('ERROR') || line.includes('DEBUG')) {
console.log(`[ReportLogManager] 未匹配的日志行: "${line}"`);
}
appendConsoleTextLine('report', line);
}
}
// 处理错误
handleError(error) {
// 增加连续错误计数
this.consecutiveErrors++;
// 避免重复错误日志
const errorMsg = error.message || error.toString();
if (errorMsg === this.lastError) {
return; // 相同错误不重复输出
}
const isSameError = (errorMsg === this.lastError);
this.lastError = errorMsg;
this.retryCount++;
// 只在前几次重试时输出错误
if (this.retryCount <= this.maxRetries) {
console.warn(`[ReportLogManager] 获取日志失败 (${this.retryCount}/${this.maxRetries}): ${errorMsg}`);
// 只在前几次或新错误时输出
if (!isSameError && this.consecutiveErrors <= 3) {
console.warn(`[ReportLogManager] 获取日志失败 (连续${this.consecutiveErrors}): ${errorMsg}`);
}
// 超过最大重试次数时暂停一段时间
if (this.retryCount > this.maxRetries) {
// 连续错误过多时暂停
if (this.consecutiveErrors >= this.maxConsecutiveErrors) {
this.stop();
console.error('[ReportLogManager] 多次失败,暂停轮询');
console.error('[ReportLogManager] 连续错误过多,暂停轮询');
// 5秒后自动重试
// 10秒后自动重试
setTimeout(() => {
if (currentApp === 'report') {
if (currentApp === 'report' && !this.isRunning) {
console.log('[ReportLogManager] 尝试恢复轮询...');
this.consecutiveErrors = 0; // 重置错误计数
this.start();
}
}, 5000);
}, 10000);
}
}
@@ -5401,7 +5389,6 @@
// 【优化】启动日志轮询
// 确保从任务开始就能读取日志
reportLogManager.start();
console.log('[ReportLogManager] 任务创建成功,计数器已重置,启动日志轮询');
if (window.EventSource) {
openReportStream(reportTaskId);
@@ -5667,22 +5654,25 @@
reportStreamRetryDelay = 3000;
updateReportStreamStatus('connected');
appendReportStreamLine(isRetry ? 'SSE重连成功' : 'Report Engine流式连接已建立', 'success', { badge: 'SSE' });
reportLogStreaming = true;
// SSE已经推送日志,关闭轮询避免重复
reportLogManager.stop();
reportLogManager.reset();
startStreamHeartbeat();
// 【修复】启动日志轮询,读取logger.info/debug/warning/error
// SSE只推送显式事件(stage/chapter_status等),logger日志需要轮询读取
reportLogManager.start();
console.log('[ReportLogManager] SSE连接建立,同步启动日志轮询');
};
reportEventSource.onerror = () => {
appendReportStreamLine('检测到网络抖动,SSE正在等待自动重连...', 'warn', { badge: 'SSE' });
updateReportStreamStatus('reconnecting');
clearStreamHeartbeat();
safeCloseReportStream(true, true);
// SSE断开期间恢复轮询,避免日志缺口
if (reportTaskId) {
reportLogManager.start();
}
scheduleReportStreamReconnect(taskId);
};
const events = ['status', 'stage', 'chapter_status', 'chapter_chunk', 'warning', 'error', 'debug', 'html_ready', 'completed', 'heartbeat'];
const events = ['status', 'stage', 'chapter_status', 'chapter_chunk', 'warning', 'error', 'debug', 'html_ready', 'completed', 'heartbeat', 'log'];
events.forEach(evt => {
reportEventSource.addEventListener(evt, (event) => dispatchReportStreamEvent(evt, event));
});
@@ -5695,13 +5685,13 @@
reportEventSource.close();
reportEventSource = null;
}
reportLogStreaming = false;
if (reportStreamReconnectTimer) {
clearTimeout(reportStreamReconnectTimer);
reportStreamReconnectTimer = null;
}
// 清除日志刷新(使用新的日志管理器)
reportLogManager.stop();
console.log('[ReportLogManager] SSE连接关闭,停止日志轮询');
clearStreamHeartbeat();
if (!keepIndicator) {
@@ -5796,6 +5786,21 @@
case 'debug':
appendReportStreamLine(payload.message || 'Debug信息', 'info', { badge: 'DEBUG' });
break;
case 'log': {
if (payload.line) {
const level = (payload.level || '').toLowerCase();
let levelClass = '';
if (level === 'error' || level === 'critical') {
levelClass = 'error';
} else if (level === 'warning') {
levelClass = 'warning';
} else if (level === 'debug') {
levelClass = 'debug';
}
appendConsoleTextLine('report', payload.line, `console-line report-stream-line ${levelClass}`.trim());
}
break;
}
case 'html_ready':
appendReportStreamLine('HTML渲染完成,正在刷新预览...', 'success');
if (task) {