diff --git a/ReportEngine/nodes/chapter_generation_node.py b/ReportEngine/nodes/chapter_generation_node.py
index 0e12ee7..0e38a12 100644
--- a/ReportEngine/nodes/chapter_generation_node.py
+++ b/ReportEngine/nodes/chapter_generation_node.py
@@ -10,12 +10,12 @@ from __future__ import annotations
import json
from pathlib import Path
import re
-from typing import Any, Dict, List, Tuple
+from typing import Any, Dict, List, Tuple, Callable, Optional
from loguru import logger
from ..core import TemplateSection, ChapterStorage
-from ..ir import ALLOWED_BLOCK_TYPES, IRValidator
+from ..ir import ALLOWED_BLOCK_TYPES, ALLOWED_INLINE_MARKS, IRValidator
from ..prompts import (
SYSTEM_PROMPT_CHAPTER_JSON,
build_chapter_user_prompt,
@@ -28,10 +28,41 @@ except ImportError: # pragma: no cover - optional dependency
_json_repair_fn = None
+class ChapterJsonParseError(ValueError):
+ """Raised when the LLM output for a chapter cannot be parsed as valid JSON."""
+
+ def __init__(self, message: str, raw_text: Optional[str] = None):
+ super().__init__(message)
+ self.raw_text = raw_text
+
+
class ChapterGenerationNode(BaseNode):
"""负责按章节调用LLM并校验JSON结构"""
_COLON_EQUALS_PATTERN = re.compile(r'(":\s*)=')
+ _LINE_BREAK_SENTINEL = "__LINE_BREAK__"
+ _INLINE_MARK_ALIASES = {
+ "strong": "bold",
+ "b": "bold",
+ "em": "italic",
+ "emphasis": "italic",
+ "i": "italic",
+ "u": "underline",
+ "strike-through": "strike",
+ "strikethrough": "strike",
+ "s": "strike",
+ "codeblock": "code",
+ "monospace": "code",
+ "hyperlink": "link",
+ "url": "link",
+ "colour": "color",
+ "textcolor": "color",
+ "bgcolor": "highlight",
+ "background": "highlight",
+ "highlightcolor": "highlight",
+ "sub": "subscript",
+ "sup": "superscript",
+ }
def __init__(self, llm_client, validator: IRValidator, storage: ChapterStorage):
"""
@@ -51,6 +82,7 @@ class ChapterGenerationNode(BaseNode):
section: TemplateSection,
context: Dict[str, Any],
run_dir: Path,
+ stream_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None,
**kwargs,
) -> Dict[str, Any]:
"""针对单个章节调用LLM,校验/落盘章节JSON并返回结构化结果"""
@@ -64,7 +96,13 @@ class ChapterGenerationNode(BaseNode):
llm_payload = self._build_payload(section, context)
user_message = build_chapter_user_prompt(llm_payload)
- raw_text = self._stream_llm(user_message, chapter_dir, **kwargs)
+ raw_text = self._stream_llm(
+ user_message,
+ chapter_dir,
+ stream_callback=stream_callback,
+ section_meta=chapter_meta,
+ **kwargs,
+ )
chapter_json = self._parse_chapter(raw_text)
# 自动补全关键字段后再校验
@@ -150,8 +188,15 @@ class ChapterGenerationNode(BaseNode):
payload["globalContext"]["sectionBudgets"] = chapter_plan["sections"]
return payload
- def _stream_llm(self, user_message: str, chapter_dir: Path, **kwargs) -> str:
- """流式调用LLM并实时写入raw文件"""
+ def _stream_llm(
+ self,
+ user_message: str,
+ chapter_dir: Path,
+ stream_callback: Optional[Callable[[str, Dict[str, Any]], None]] = None,
+ section_meta: Optional[Dict[str, Any]] = None,
+ **kwargs,
+ ) -> str:
+ """流式调用LLM并实时写入raw文件,同时通过回调将delta抛出。"""
chunks: List[str] = []
with self.storage.capture_stream(chapter_dir) as stream_fp:
stream = self.llm_client.stream_invoke(
@@ -163,6 +208,12 @@ class ChapterGenerationNode(BaseNode):
for delta in stream:
stream_fp.write(delta)
chunks.append(delta)
+ if stream_callback:
+ meta = section_meta or {}
+ try:
+ stream_callback(delta, meta)
+ except Exception as callback_error: # pragma: no cover - 仅记录,不阻断主流程
+ logger.warning(f"章节流式回调失败: {callback_error}")
return "".join(chunks)
def _parse_chapter(self, raw_text: str) -> Dict[str, Any]:
@@ -192,9 +243,13 @@ class ChapterGenerationNode(BaseNode):
try:
data = self._parse_with_candidates(candidate_payloads[-1:])
except json.JSONDecodeError as inner_exc:
- raise ValueError(f"章节JSON解析失败: {inner_exc}") from inner_exc
+ raise ChapterJsonParseError(
+ f"章节JSON解析失败: {inner_exc}", raw_text=cleaned
+ ) from inner_exc
else:
- raise ValueError(f"章节JSON解析失败: {exc}") from exc
+ raise ChapterJsonParseError(
+ f"章节JSON解析失败: {exc}", raw_text=cleaned
+ ) from exc
if "chapter" in data and isinstance(data["chapter"], dict):
return data["chapter"]
@@ -400,6 +455,7 @@ class ChapterGenerationNode(BaseNode):
if not isinstance(block, dict):
continue
self._ensure_block_type(block)
+ self._sanitize_block_content(block)
block_type = block.get("type")
if block_type == "list":
items = block.get("items")
@@ -424,6 +480,98 @@ class ChapterGenerationNode(BaseNode):
walk(chapter.get("blocks"))
+ def _sanitize_block_content(self, block: Dict[str, Any]):
+ """根据类型做精细化修复,例如清理paragraph内的非法inline mark"""
+ block_type = block.get("type")
+ if block_type == "paragraph":
+ self._normalize_paragraph_block(block)
+
+ def _normalize_paragraph_block(self, block: Dict[str, Any]):
+ """将paragraph的inlines统一规整,剔除非法marks"""
+ inlines = block.get("inlines")
+ normalized_runs: List[Dict[str, Any]] = []
+ if isinstance(inlines, list) and inlines:
+ for run in inlines:
+ normalized_runs.extend(self._coerce_inline_run(run))
+ else:
+ normalized_runs = [self._as_inline_run(self._extract_block_text(block))]
+ if not normalized_runs:
+ normalized_runs = [self._as_inline_run("")]
+ block["inlines"] = normalized_runs
+
+ def _coerce_inline_run(self, run: Any) -> List[Dict[str, Any]]:
+ """将任意inline写法规整为合法run"""
+ if isinstance(run, dict):
+ normalized_run = dict(run)
+ text = normalized_run.get("text")
+ if not isinstance(text, str):
+ text = "" if text is None else str(text)
+ marks = normalized_run.get("marks")
+ sanitized_marks, extra_text = self._sanitize_inline_marks(marks)
+ normalized_run["marks"] = sanitized_marks
+ normalized_run["text"] = (text or "") + extra_text
+ return [normalized_run]
+ if isinstance(run, str):
+ return [self._as_inline_run(run)]
+ if isinstance(run, (int, float)):
+ return [self._as_inline_run(str(run))]
+ if isinstance(run, list):
+ normalized: List[Dict[str, Any]] = []
+ for item in run:
+ normalized.extend(self._coerce_inline_run(item))
+ return normalized
+ return [self._as_inline_run("" if run is None else str(run))]
+
+ def _sanitize_inline_marks(self, marks: Any) -> Tuple[List[Dict[str, Any]], str]:
+ """过滤非法marks并将break类控制符转成文本"""
+ text_suffix = ""
+ if marks is None:
+ return [], text_suffix
+ mark_list = marks if isinstance(marks, list) else [marks]
+ sanitized: List[Dict[str, Any]] = []
+ for mark in mark_list:
+ normalized_mark, extra_text = self._normalize_inline_mark(mark)
+ if normalized_mark:
+ sanitized.append(normalized_mark)
+ if extra_text:
+ text_suffix += extra_text
+ return sanitized, text_suffix
+
+ def _normalize_inline_mark(self, mark: Any) -> Tuple[Dict[str, Any] | None, str]:
+ """对单个mark做兼容映射,或者在必要时转换为文本"""
+ if not isinstance(mark, dict):
+ return None, ""
+ canonical_type = self._canonical_inline_mark_type(mark.get("type"))
+ if canonical_type == self._LINE_BREAK_SENTINEL:
+ return None, "\n"
+ if canonical_type in ALLOWED_INLINE_MARKS:
+ normalized = dict(mark)
+ normalized["type"] = canonical_type
+ return normalized, ""
+ return None, ""
+
+ def _canonical_inline_mark_type(self, mark_type: Any) -> str | None:
+ """将mark type映射为Schema所支持的取值"""
+ if not isinstance(mark_type, str):
+ return None
+ normalized = mark_type.strip()
+ if not normalized:
+ return None
+ lowered = normalized.lower()
+ if lowered in {"break", "linebreak", "br"}:
+ return self._LINE_BREAK_SENTINEL
+ return self._INLINE_MARK_ALIASES.get(lowered, lowered)
+
+ def _extract_block_text(self, block: Dict[str, Any]) -> str:
+ """优先从text/content等字段提取fallback文本"""
+ for key in ("text", "content", "value", "title"):
+ value = block.get(key)
+ if isinstance(value, str):
+ return value
+ if value is not None:
+ return str(value)
+ return ""
+
def _normalize_list_items(self, items: Any) -> List[List[Dict[str, Any]]]:
"""确保list block的items为[[block, block], ...]结构"""
if not isinstance(items, list):
@@ -490,16 +638,21 @@ class ChapterGenerationNode(BaseNode):
text = str(block)
block.clear()
block["type"] = "paragraph"
- block["inlines"] = [{"text": text}]
+ block["inlines"] = [self._as_inline_run(text)]
@staticmethod
def _as_paragraph_block(text: str) -> Dict[str, Any]:
"""将字符串快速包装成paragraph block,方便统一处理"""
return {
"type": "paragraph",
- "inlines": [{"text": text or ""}],
+ "inlines": [ChapterGenerationNode._as_inline_run(text)],
}
+ @staticmethod
+ def _as_inline_run(text: str) -> Dict[str, Any]:
+ """构造基础inline run,保证marks字段存在"""
+ return {"text": text or "", "marks": []}
+
@staticmethod
def _parse_with_candidates(payloads: List[str]) -> Dict[str, Any]:
"""按顺序尝试多个payload,直到解析成功"""
@@ -513,4 +666,4 @@ class ChapterGenerationNode(BaseNode):
raise last_exc
-__all__ = ["ChapterGenerationNode"]
+__all__ = ["ChapterGenerationNode", "ChapterJsonParseError"]
diff --git a/ReportEngine/renderers/html_renderer.py b/ReportEngine/renderers/html_renderer.py
index b7a73ae..0a9f199 100644
--- a/ReportEngine/renderers/html_renderer.py
+++ b/ReportEngine/renderers/html_renderer.py
@@ -4,6 +4,7 @@
from __future__ import annotations
+import ast
import html
import json
from typing import Any, Dict, List
@@ -51,7 +52,7 @@ class HTMLRenderer:
head = self._render_head(title, theme_tokens)
body = self._render_body()
- return f"\n\n{head}\n{body}\n"
+ return f"\n\n{head}\n{body}\n"
# ====== Head / Body ======
@@ -83,6 +84,10 @@ class HTMLRenderer:
+
""".strip()
def _render_body(self) -> str:
@@ -423,6 +428,8 @@ class HTMLRenderer:
items_html = ""
for item in block.get("items", []):
content = self._render_blocks(item)
+ if not content.strip():
+ continue
items_html += f"
{content}"
class_attr = f' class="{extra_class}"' if extra_class else ""
return f'<{tag}{class_attr}>{items_html}{tag}>'
@@ -545,7 +552,7 @@ class HTMLRenderer:
row_cells.append(f"{self._escape_html(value)} | ")
body_rows += f"{''.join(row_cells)}
"
table_html = f"""
-
+
| 类别 | {header_cells}
@@ -556,20 +563,93 @@ class HTMLRenderer:
"""
- return f"
"
+ return table_html
# ====== Inline 渲染 ======
+ def _normalize_inline_payload(self, run: Dict[str, Any]) -> tuple[str, List[Dict[str, Any]]]:
+ """将嵌套inline node展平成基础文本与marks"""
+ if not isinstance(run, dict):
+ return ("" if run is None else str(run)), []
+
+ marks = list(run.get("marks") or [])
+ text_value: Any = run.get("text", "")
+ seen: set[int] = set()
+
+ while isinstance(text_value, dict):
+ obj_id = id(text_value)
+ if obj_id in seen:
+ text_value = ""
+ break
+ seen.add(obj_id)
+ nested_marks = text_value.get("marks")
+ if nested_marks:
+ marks.extend(nested_marks)
+ if "text" in text_value:
+ text_value = text_value.get("text")
+ else:
+ text_value = json.dumps(text_value, ensure_ascii=False)
+ break
+
+ if text_value is None:
+ text_value = ""
+ elif isinstance(text_value, (int, float)):
+ text_value = str(text_value)
+ elif not isinstance(text_value, str):
+ try:
+ text_value = json.dumps(text_value, ensure_ascii=False)
+ except TypeError:
+ text_value = str(text_value)
+
+ if isinstance(text_value, str):
+ stripped = text_value.strip()
+ if stripped.startswith("{") and stripped.endswith("}"):
+ payload = None
+ try:
+ payload = json.loads(stripped)
+ except json.JSONDecodeError:
+ try:
+ payload = ast.literal_eval(stripped)
+ except (ValueError, SyntaxError):
+ payload = None
+ if isinstance(payload, dict):
+ sentinel_keys = {"xrefs", "widgets", "footnotes", "errors", "metadata"}
+ if set(payload.keys()).issubset(sentinel_keys):
+ text_value = ""
+ else:
+ inline_payload = self._coerce_inline_payload(payload)
+ if inline_payload:
+ nested_text = inline_payload.get("text")
+ if nested_text is not None:
+ text_value = nested_text
+ nested_marks = inline_payload.get("marks")
+ if isinstance(nested_marks, list):
+ marks.extend(nested_marks)
+
+ return text_value, marks
+
+ @staticmethod
+ def _coerce_inline_payload(payload: Dict[str, Any]) -> Dict[str, Any] | None:
+ """尽力将字符串里的内联节点恢复为dict,修复渲染遗漏"""
+ if not isinstance(payload, dict):
+ return None
+ inline_type = payload.get("type")
+ if inline_type and inline_type not in {"inline", "text"}:
+ return None
+ if "text" not in payload and "marks" not in payload:
+ return None
+ return payload
+
def _render_inline(self, run: Dict[str, Any]) -> str:
"""渲染单个inline run,支持多种marks叠加"""
- marks = run.get("marks") or []
+ text_value, marks = self._normalize_inline_payload(run)
math_mark = next((mark for mark in marks if mark.get("type") == "math"), None)
if math_mark:
latex = math_mark.get("value")
if not isinstance(latex, str) or not latex.strip():
- latex = run.get("text", "")
+ latex = text_value
return f'
\\( {self._escape_html(latex)} \\)'
- text = self._escape_html(run.get("text", ""))
+ text = self._escape_html(text_value)
styles: List[str] = []
prefix: List[str] = []
suffix: List[str] = []
@@ -653,6 +733,30 @@ class HTMLRenderer:
cursor = end + 2
return "".join(result)
+ # ====== 文本 / 安全工具 ======
+
+ def _safe_text(self, value: Any) -> str:
+ """将任意值安全转换为字符串,None与复杂对象容错"""
+ if value is None:
+ return ""
+ if isinstance(value, str):
+ return value
+ if isinstance(value, (int, float, bool)):
+ return str(value)
+ try:
+ return json.dumps(value, ensure_ascii=False)
+ except (TypeError, ValueError):
+ return str(value)
+
+ def _escape_html(self, value: Any) -> str:
+ """HTML文本上下文的转义"""
+ return html.escape(self._safe_text(value), quote=False)
+
+ def _escape_attr(self, value: Any) -> str:
+ """HTML属性上下文转义并去掉危险换行"""
+ escaped = html.escape(self._safe_text(value), quote=True)
+ return escaped.replace("\n", " ").replace("\r", " ")
+
# ====== CSS / JS ======
def _build_css(self, tokens: Dict[str, Any]) -> str:
@@ -1013,10 +1117,17 @@ table th {{
min-height: 320px;
}}
.chart-fallback {{
+ display: none;
margin-top: 12px;
font-size: 0.85rem;
overflow-x: auto;
}}
+.no-js .chart-fallback {{
+ display: block;
+}}
+.no-js .chart-container {{
+ display: none;
+}}
.chart-fallback table {{
width: 100%;
border-collapse: collapse;
@@ -1030,6 +1141,11 @@ table th {{
.chart-fallback th {{
background: rgba(0,0,0,0.04);
}}
+.chart-note {{
+ margin-top: 8px;
+ font-size: 0.85rem;
+ color: var(--secondary-color);
+}}
figure {{
margin: 20px 0;
text-align: center;
@@ -1091,7 +1207,19 @@ pre.code-block {{
"""返回页面底部的JS,负责Chart.js注水与导出逻辑"""
return """
""".strip()
- # ====== Utils ======
-
- @staticmethod
- def _escape_html(value: Any) -> str:
- """HTML内容转义工具,避免XSS"""
- return html.escape(str(value)) if value is not None else ""
-
- @staticmethod
- def _escape_attr(value: Any) -> str:
- """HTML属性值转义工具"""
- return html.escape(str(value), quote=True) if value is not None else ""
-
__all__ = ["HTMLRenderer"]
diff --git a/ReportEngine/utils/config.py b/ReportEngine/utils/config.py
index 06e38b2..ffa7f2a 100644
--- a/ReportEngine/utils/config.py
+++ b/ReportEngine/utils/config.py
@@ -25,6 +25,9 @@ class Settings(BaseSettings):
DOCUMENT_IR_OUTPUT_DIR: str = Field(
"final_reports/ir", description="整本IR/Manifest输出目录"
)
+ CHAPTER_JSON_MAX_ATTEMPTS: int = Field(
+ 2, description="章节JSON解析失败时的最大尝试次数"
+ )
TEMPLATE_DIR: str = Field("ReportEngine/report_template", description="多模板目录")
API_TIMEOUT: float = Field(900.0, description="单API超时时间(秒)")
MAX_RETRY_DELAY: float = Field(180.0, description="最大重试间隔(秒)")
@@ -52,6 +55,7 @@ def print_config(config: Settings):
message += f"最大内容长度: {config.MAX_CONTENT_LENGTH}\n"
message += f"输出目录: {config.OUTPUT_DIR}\n"
message += f"章节JSON目录: {config.CHAPTER_OUTPUT_DIR}\n"
+ message += f"章节JSON最大尝试次数: {config.CHAPTER_JSON_MAX_ATTEMPTS}\n"
message += f"整本IR目录: {config.DOCUMENT_IR_OUTPUT_DIR}\n"
message += f"模板目录: {config.TEMPLATE_DIR}\n"
message += f"API 超时时间: {config.API_TIMEOUT} 秒\n"
diff --git a/templates/index.html b/templates/index.html
index eb39fcb..1080959 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -1027,6 +1027,49 @@
display: none;
}
+ .report-stream-line {
+ font-size: 12px;
+ margin-bottom: 4px;
+ display: flex;
+ align-items: center;
+ gap: 8px;
+ line-height: 1.5;
+ }
+
+ .report-stream-line .timestamp {
+ color: #cccccc;
+ min-width: 60px;
+ }
+
+ .report-stream-line .stream-badge {
+ border: 1px solid #444444;
+ padding: 1px 6px;
+ font-size: 10px;
+ text-transform: uppercase;
+ color: #ffffff;
+ letter-spacing: 0.5px;
+ }
+
+ .report-stream-line .line-text {
+ flex: 1;
+ }
+
+ .report-stream-line.chunk {
+ color: #8fd5ff;
+ }
+
+ .report-stream-line.warn {
+ color: #ffd166;
+ }
+
+ .report-stream-line.error {
+ color: #ff6b6b;
+ }
+
+ .report-stream-line.success {
+ color: #80ffb5;
+ }
+
.report-loading {
display: flex;
align-items: center;
@@ -1165,6 +1208,9 @@
let systemStarted = false;
let systemStarting = false;
let configModalLocked = false;
+ let socketConnected = false;
+ let reportStreamConnected = false;
+ let backendReachable = false;
const CONFIG_ENDPOINT = '/api/config';
const SYSTEM_STATUS_ENDPOINT = '/api/system/status';
@@ -1276,6 +1322,7 @@
setInterval(updateTime, 1000);
checkStatus();
setInterval(checkStatus, 5000);
+ startConnectionProbe();
// 初始化密码切换功能(事件委托,只需调用一次)
attachConfigPasswordToggles();
@@ -1308,12 +1355,14 @@
socket = io();
socket.on('connect', function() {
- updateConnectionStatus('已连接');
+ socketConnected = true;
+ refreshConnectionStatus();
socket.emit('request_status');
});
socket.on('disconnect', function() {
- updateConnectionStatus('连接断开');
+ socketConnected = false;
+ refreshConnectionStatus();
});
socket.on('console_output', function(data) {
@@ -2255,10 +2304,38 @@
fetch('/api/status')
.then(response => response.json())
.then(data => {
+ backendReachable = true;
updateAppStatus(data);
+ refreshConnectionStatus();
})
.catch(error => {
console.error('状态检查失败:', error);
+ backendReachable = false;
+ refreshConnectionStatus();
+ });
+ }
+
+ function startConnectionProbe() {
+ if (connectionProbeTimer) {
+ clearInterval(connectionProbeTimer);
+ }
+ probeBackendConnection();
+ connectionProbeTimer = setInterval(probeBackendConnection, CONNECTION_PROBE_INTERVAL);
+ }
+
+ function probeBackendConnection() {
+ fetch('/api/report/status?heartbeat=1', { cache: 'no-store' })
+ .then(response => {
+ if (!response.ok) throw new Error('heartbeat failed');
+ return response.json();
+ })
+ .then(() => {
+ backendReachable = true;
+ refreshConnectionStatus();
+ })
+ .catch(() => {
+ backendReachable = false;
+ refreshConnectionStatus();
});
}
@@ -2279,9 +2356,15 @@
updateEmbeddedPage(currentApp);
}
- // 更新连接状态
- function updateConnectionStatus(status) {
- document.getElementById('connectionStatus').textContent = status;
+ // 根据当前的Socket/SSE状态刷新底部连接指示
+ function refreshConnectionStatus() {
+ const statusEl = document.getElementById('connectionStatus');
+ if (!statusEl) return;
+ if (socketConnected || reportStreamConnected || backendReachable) {
+ statusEl.textContent = '已连接';
+ } else {
+ statusEl.textContent = '连接断开';
+ }
}
// 更新时间
@@ -2738,6 +2821,14 @@
// Report Engine 相关函数
let reportTaskId = null;
let reportPollingInterval = null;
+ let reportEventSource = null;
+ let reportAutoPreviewLoaded = false;
+ let reportStreamReconnectTimer = null;
+ let reportStreamRetryDelay = 3000;
+ let streamHeartbeatTimeout = null;
+ let streamHeartbeatInterval = null;
+ let connectionProbeTimer = null;
+ const CONNECTION_PROBE_INTERVAL = 15000;
// 加载报告界面
function loadReportInterface() {
@@ -2811,6 +2902,8 @@
reportContent.innerHTML = interfaceHTML;
initializeReportControls();
+ resetReportStreamOutput('等待新的Report任务启动...');
+ updateReportStreamStatus('idle');
// 立即更新状态信息
updateEngineStatusDisplay(statusData);
@@ -2818,8 +2911,22 @@
// 如果有当前任务,显示任务状态
if (statusData.current_task) {
updateTaskProgressStatus(statusData.current_task);
+ if (statusData.current_task.status === 'running') {
+ reportTaskId = statusData.current_task.task_id;
+ reportAutoPreviewLoaded = false;
+ if (window.EventSource) {
+ openReportStream(reportTaskId);
+ } else {
+ startProgressPolling(reportTaskId);
+ }
+ } else if (statusData.current_task.status === 'completed') {
+ lastCompletedReportTask = statusData.current_task;
+ updateDownloadButtonState(statusData.current_task);
+ }
} else {
updateDownloadButtonState(null);
+ safeCloseReportStream();
+ reportTaskId = null;
}
}
@@ -3054,10 +3161,13 @@
// 重置日志计数器,因为后台会清空日志文件
reportLogLineCount = 0;
+ reportAutoPreviewLoaded = false;
+ safeCloseReportStream(true);
// 清空控制台显示
const consoleOutput = document.getElementById('consoleOutput');
consoleOutput.innerHTML = '
[系统] 开始生成报告,日志已重置
';
+ resetReportStreamOutput('Report Engine 正在调度任务...');
setGenerateButtonState(true);
@@ -3099,14 +3209,21 @@
refreshReportLog();
}, 500);
- // 开始轮询任务状态
- startProgressPolling(data.task_id);
+ appendReportStreamLine('任务创建成功,正在建立流式连接...', 'info', { force: true });
+ if (window.EventSource) {
+ openReportStream(reportTaskId);
+ } else {
+ startProgressPolling(data.task_id);
+ }
} else {
updateTaskProgressStatus(null, 'error', '启动失败: ' + data.error);
// 重置标志允许重新尝试
autoGenerateTriggered = false;
reportTaskId = null;
setGenerateButtonState(false);
+ appendReportStreamLine('任务启动失败: ' + (data.error || '未知错误'), 'error');
+ updateReportStreamStatus('error');
+ safeCloseReportStream();
}
})
.catch(error => {
@@ -3116,6 +3233,9 @@
autoGenerateTriggered = false;
reportTaskId = null;
setGenerateButtonState(false);
+ appendReportStreamLine('任务启动阶段异常: ' + error.message, 'error');
+ updateReportStreamStatus('error');
+ safeCloseReportStream();
});
}
@@ -3147,6 +3267,7 @@
// 自动显示报告
viewReport(taskId);
+ reportAutoPreviewLoaded = true;
// 重置自动生成标志,允许下次有新内容时自动生成
autoGenerateTriggered = false;
@@ -3225,6 +3346,319 @@
updateTaskProgressStatus(task);
}
+ // ====== Report Engine SSE流式辅助函数 ======
+ // 重置流式日志入口,将提示语写入控制台,保持与右侧黑框一致
+ function resetReportStreamOutput(message = '等待新的Report任务启动...') {
+ appendReportStreamLine(message, 'info', { badge: 'REPORT', force: true });
+ }
+
+ // 根据状态同步流式指示灯,与后端心跳保持一致
+ function updateReportStreamStatus(state) {
+ if (state === 'connected') {
+ reportStreamConnected = true;
+ } else if (['idle', 'error', 'connecting', 'reconnecting'].includes(state)) {
+ reportStreamConnected = false;
+ }
+
+ const statusEl = document.getElementById('reportStreamStatus');
+ if (statusEl) {
+ const textMap = {
+ idle: '未连接',
+ connecting: '连接中',
+ connected: '实时更新中',
+ reconnecting: '等待重连',
+ error: '已断开'
+ };
+ statusEl.textContent = textMap[state] || state;
+ statusEl.dataset.state = state;
+ }
+
+ refreshConnectionStatus();
+ }
+
+ // 往黑色控制台输出区域追加一条流式日志
+ function appendReportStreamLine(message, level = 'info', options = {}) {
+ const consoleOutput = document.getElementById('consoleOutput');
+ if (!consoleOutput) return;
+
+ if (level === 'chunk' && !options.force) {
+ return; // 章节内容流式写入不再逐条输出
+ }
+
+ const line = document.createElement('div');
+ line.className = `console-line report-stream-line ${level}`;
+
+ const timestampSpan = document.createElement('span');
+ timestampSpan.className = 'timestamp';
+ timestampSpan.textContent = new Date().toLocaleTimeString('zh-CN');
+ line.appendChild(timestampSpan);
+
+ if (options.badge) {
+ const badge = document.createElement('span');
+ badge.className = 'stream-badge';
+ badge.textContent = options.badge;
+ line.appendChild(badge);
+ }
+
+ const textSpan = document.createElement('span');
+ textSpan.className = 'line-text';
+ textSpan.textContent = message;
+ line.appendChild(textSpan);
+
+ consoleOutput.appendChild(line);
+ consoleOutput.scrollTop = consoleOutput.scrollHeight;
+ }
+
+ function startStreamHeartbeat() {
+ clearStreamHeartbeat();
+ const emitHeartbeat = () => {
+ appendReportStreamLine('Report Engine 正在流式生成,请耐心等待...', 'info', { badge: 'REPORT', force: true });
+ };
+
+ const scheduleFirstTick = () => {
+ const now = Date.now();
+ const msToNextMinute = 60000 - (now % 60000);
+ streamHeartbeatTimeout = setTimeout(() => {
+ emitHeartbeat();
+ streamHeartbeatInterval = setInterval(emitHeartbeat, 60000);
+ }, msToNextMinute);
+ };
+
+ scheduleFirstTick();
+ }
+
+ function clearStreamHeartbeat() {
+ if (streamHeartbeatTimeout) {
+ clearTimeout(streamHeartbeatTimeout);
+ streamHeartbeatTimeout = null;
+ }
+ if (streamHeartbeatInterval) {
+ clearInterval(streamHeartbeatInterval);
+ streamHeartbeatInterval = null;
+ }
+ }
+
+ // 建立SSE连接,实时订阅Report Engine推送
+ function openReportStream(taskId, isRetry = false) {
+ if (!taskId) return;
+ if (!window.EventSource) {
+ appendReportStreamLine('浏览器不支持SSE,已自动回退为轮询模式', 'warn', { badge: 'SSE', force: true });
+ updateReportStreamStatus('error');
+ clearStreamHeartbeat();
+ startProgressPolling(taskId);
+ return;
+ }
+ if (reportPollingInterval) {
+ clearInterval(reportPollingInterval);
+ reportPollingInterval = null;
+ }
+ if (reportEventSource && reportEventSource.__taskId === taskId) {
+ if (reportEventSource.readyState !== EventSource.CLOSED) {
+ return;
+ }
+ safeCloseReportStream(true, true);
+ } else if (reportEventSource) {
+ safeCloseReportStream(true, true);
+ }
+
+ if (reportStreamReconnectTimer) {
+ clearTimeout(reportStreamReconnectTimer);
+ reportStreamReconnectTimer = null;
+ }
+
+ if (!isRetry) {
+ reportStreamRetryDelay = 3000;
+ }
+
+ updateReportStreamStatus('connecting');
+ appendReportStreamLine(
+ isRetry ? '尝试重连Report Engine流式通道...' : '正在建立Report Engine流式连接...',
+ 'info',
+ { badge: 'SSE', force: true }
+ );
+
+ reportEventSource = new EventSource(`/api/report/stream/${taskId}`);
+ reportEventSource.__taskId = taskId;
+ reportEventSource.onopen = () => {
+ reportStreamRetryDelay = 3000;
+ updateReportStreamStatus('connected');
+ appendReportStreamLine(isRetry ? 'SSE重连成功' : 'Report Engine流式连接已建立', 'success', { badge: 'SSE' });
+ startStreamHeartbeat();
+ };
+ reportEventSource.onerror = () => {
+ appendReportStreamLine('检测到网络抖动,SSE正在等待自动重连...', 'warn', { badge: 'SSE' });
+ updateReportStreamStatus('reconnecting');
+ clearStreamHeartbeat();
+ safeCloseReportStream(true, true);
+ scheduleReportStreamReconnect(taskId);
+ };
+
+ const events = ['status', 'stage', 'chapter_status', 'chapter_chunk', 'warning', 'html_ready', 'completed', 'error', 'heartbeat'];
+ events.forEach(evt => {
+ reportEventSource.addEventListener(evt, (event) => dispatchReportStreamEvent(evt, event));
+ });
+ reportEventSource.onmessage = (event) => dispatchReportStreamEvent(event.type || 'message', event);
+ }
+
+ // 关闭SSE连接,可根据场景选择是否立即重置指示灯
+ function safeCloseReportStream(keepIndicator = false, preserveRetryDelay = false) {
+ if (reportEventSource) {
+ reportEventSource.close();
+ reportEventSource = null;
+ }
+ if (reportStreamReconnectTimer) {
+ clearTimeout(reportStreamReconnectTimer);
+ reportStreamReconnectTimer = null;
+ }
+ clearStreamHeartbeat();
+ if (!keepIndicator) {
+ updateReportStreamStatus('idle');
+ } else {
+ reportStreamConnected = false;
+ refreshConnectionStatus();
+ }
+ if (!preserveRetryDelay) {
+ reportStreamRetryDelay = 3000;
+ }
+ }
+
+ function scheduleReportStreamReconnect(taskId) {
+ if (!taskId || reportStreamReconnectTimer) {
+ return;
+ }
+ reportStreamReconnectTimer = setTimeout(() => {
+ reportStreamReconnectTimer = null;
+ if (reportTaskId === taskId) {
+ openReportStream(taskId, true);
+ }
+ }, reportStreamRetryDelay);
+ reportStreamRetryDelay = Math.min(reportStreamRetryDelay * 2, 15000);
+ }
+
+ // 统一的事件派发入口,负责解析JSON并交给业务处理
+ function dispatchReportStreamEvent(eventType, event) {
+ try {
+ const data = JSON.parse(event.data);
+ handleReportStreamEvent(eventType, data);
+ } catch (error) {
+ console.warn('解析流式事件失败:', error);
+ }
+ }
+
+ // 结合事件类型输出控件/状态,确保网络抖动时也能及时反馈
+ function handleReportStreamEvent(eventType, eventData) {
+ if (!eventData) return;
+ const payload = eventData.payload || {};
+ const task = payload.task;
+
+ if (eventType === 'status' && task) {
+ updateTaskProgressStatus(task);
+ reportTaskId = task.status === 'running' ? task.task_id : null;
+ if (task.status === 'completed') {
+ lastCompletedReportTask = task;
+ setGenerateButtonState(false);
+ } else if (task.status === 'running') {
+ setGenerateButtonState(true);
+ }
+ }
+
+ switch (eventType) {
+ case 'stage':
+ appendReportStreamLine(
+ payload.message || `阶段: ${payload.stage || ''}`,
+ 'info',
+ {
+ badge: payload.stage || '阶段',
+ genericMessage: 'Report Engine 正在逐步生成,请耐心等待...'
+ }
+ );
+ break;
+ case 'chapter_status':
+ appendReportStreamLine(
+ `${payload.title || payload.chapterId || '章节'} ${payload.status === 'completed' ? '已完成' : '生成中'}`,
+ payload.status === 'completed' ? 'success' : 'info',
+ {
+ badge: '章节',
+ genericMessage: payload.status === 'completed'
+ ? `${payload.title || payload.chapterId || '章节'} 已完成`
+ : '章节流式生成中,请稍候...'
+ }
+ );
+ break;
+ case 'chapter_chunk':
+ if (payload.delta) {
+ appendReportStreamLine(
+ formatStreamChunk(payload.delta),
+ 'chunk',
+ {
+ badge: payload.title || payload.chapterId || '章节流',
+ genericMessage: '章节内容流式写入中...'
+ }
+ );
+ }
+ break;
+ case 'warning':
+ appendReportStreamLine(payload.message || '检测到可重试的网络波动', 'warn');
+ break;
+ case 'html_ready':
+ appendReportStreamLine('HTML渲染完成,正在刷新预览...', 'success');
+ if (task) {
+ updateDownloadButtonState(task);
+ }
+ if (eventData.task_id && !reportAutoPreviewLoaded) {
+ viewReport(eventData.task_id);
+ reportAutoPreviewLoaded = true;
+ }
+ break;
+ case 'completed':
+ appendReportStreamLine(payload.message || '任务完成', 'success');
+ safeCloseReportStream();
+ reportTaskId = null;
+ setGenerateButtonState(false);
+ if (task) {
+ lastCompletedReportTask = task;
+ updateDownloadButtonState(task);
+ }
+ if (eventData.task_id && !reportAutoPreviewLoaded) {
+ viewReport(eventData.task_id);
+ reportAutoPreviewLoaded = true;
+ }
+ break;
+ case 'cancelled':
+ appendReportStreamLine(payload.message || '任务已取消', 'warn');
+ safeCloseReportStream();
+ updateReportStreamStatus('idle');
+ reportTaskId = null;
+ setGenerateButtonState(false);
+ break;
+ case 'error':
+ appendReportStreamLine(payload.message || '任务失败', 'error');
+ safeCloseReportStream();
+ updateReportStreamStatus('error');
+ reportTaskId = null;
+ setGenerateButtonState(false);
+ break;
+ case 'heartbeat':
+ updateReportStreamStatus('connected');
+ appendReportStreamLine(payload.message || '流式连接正常,请稍候...', 'info', {
+ badge: 'SSE',
+ genericMessage: '流式连接正常,请耐心等待...'
+ });
+ break;
+ default:
+ if (payload.message) {
+ appendReportStreamLine(payload.message, 'info');
+ }
+ break;
+ }
+ }
+
+ // 清洗流式chunk,裁剪多余空白,避免影响UI
+ function formatStreamChunk(text) {
+ if (!text) return '';
+ return text.replace(/\s+/g, ' ').trim().slice(0, 200);
+ }
+
// 查看报告
function viewReport(taskId) {
const reportPreview = document.getElementById('reportPreview');
@@ -3435,4 +3869,4 @@
}