diff --git a/ReportEngine/agent.py b/ReportEngine/agent.py index fc282f5..6308d1b 100644 --- a/ReportEngine/agent.py +++ b/ReportEngine/agent.py @@ -13,7 +13,7 @@ import os from pathlib import Path from uuid import uuid4 from datetime import datetime -from typing import Optional, Dict, Any, List, Callable +from typing import Optional, Dict, Any, List, Callable, Tuple from loguru import logger @@ -199,6 +199,7 @@ class ReportAgent: # 初始化LLM客户端 self.llm_client = self._initialize_llm() + self.json_rescue_clients = self._initialize_rescue_llms() # 初始化章级存储/校验/渲染组件 self.chapter_storage = ChapterStorage(self.config.CHAPTER_OUTPUT_DIR) @@ -263,6 +264,46 @@ class ReportAgent: model_name=self.config.REPORT_ENGINE_MODEL_NAME, base_url=self.config.REPORT_ENGINE_BASE_URL, ) + + def _initialize_rescue_llms(self) -> List[Tuple[str, LLMClient]]: + """ + 初始化跨引擎章节修复所需的LLM客户端列表。 + + 顺序遵循“Report → Forum → Insight → Media”,缺失配置会被自动跳过。 + """ + clients: List[Tuple[str, LLMClient]] = [] + if self.llm_client: + clients.append(("report_engine", self.llm_client)) + fallback_specs = [ + ( + "forum_engine", + self.config.FORUM_HOST_API_KEY, + self.config.FORUM_HOST_MODEL_NAME, + self.config.FORUM_HOST_BASE_URL, + ), + ( + "insight_engine", + self.config.INSIGHT_ENGINE_API_KEY, + self.config.INSIGHT_ENGINE_MODEL_NAME, + self.config.INSIGHT_ENGINE_BASE_URL, + ), + ( + "media_engine", + self.config.MEDIA_ENGINE_API_KEY, + self.config.MEDIA_ENGINE_MODEL_NAME, + self.config.MEDIA_ENGINE_BASE_URL, + ), + ] + for label, api_key, model_name, base_url in fallback_specs: + if not api_key or not model_name: + continue + try: + client = LLMClient(api_key=api_key, model_name=model_name, base_url=base_url) + except Exception as exc: + logger.warning(f"{label} LLM初始化失败,跳过该修复通道: {exc}") + continue + clients.append((label, client)) + return clients def _initialize_nodes(self): """ @@ -280,7 +321,9 @@ class ReportAgent: self.chapter_generation_node = ChapterGenerationNode( self.llm_client, self.validator, - self.chapter_storage + self.chapter_storage, + fallback_llm_clients=self.json_rescue_clients, + error_log_dir=self.config.JSON_ERROR_LOG_DIR, ) def generate_report(self, query: str, reports: List[Any], forum_logs: str = "", diff --git a/ReportEngine/nodes/chapter_generation_node.py b/ReportEngine/nodes/chapter_generation_node.py index 10503b5..2812d04 100644 --- a/ReportEngine/nodes/chapter_generation_node.py +++ b/ReportEngine/nodes/chapter_generation_node.py @@ -8,9 +8,10 @@ from __future__ import annotations import json +from datetime import datetime from pathlib import Path import re -from typing import Any, Dict, List, Tuple, Callable, Optional +from typing import Any, Dict, List, Tuple, Callable, Optional, Set from loguru import logger @@ -19,7 +20,9 @@ from ..ir import ALLOWED_BLOCK_TYPES, ALLOWED_INLINE_MARKS, IRValidator from ..prompts import ( SYSTEM_PROMPT_CHAPTER_JSON, SYSTEM_PROMPT_CHAPTER_JSON_REPAIR, + SYSTEM_PROMPT_CHAPTER_JSON_RECOVERY, build_chapter_repair_prompt, + build_chapter_recovery_payload, build_chapter_user_prompt, ) from .base_node import BaseNode @@ -96,7 +99,14 @@ class ChapterGenerationNode(BaseNode): _PARAGRAPH_FRAGMENT_NO_TERMINATOR_MAX_CHARS = 240 _TERMINATION_PUNCTUATION = set("。!?!?;;……") - def __init__(self, llm_client, validator: IRValidator, storage: ChapterStorage): + def __init__( + self, + llm_client, + validator: IRValidator, + storage: ChapterStorage, + fallback_llm_clients: Optional[List[Tuple[str, Any]]] = None, + error_log_dir: Optional[str | Path] = None, + ): """ 记录LLM客户端/校验器/章节存储器,便于run方法调度。 @@ -108,6 +118,17 @@ class ChapterGenerationNode(BaseNode): super().__init__(llm_client, "ChapterGenerationNode") self.validator = validator self.storage = storage + self.fallback_llm_clients: List[Tuple[str, Any]] = fallback_llm_clients or [ + ("report_engine", llm_client) + ] + error_dir = Path(error_log_dir or "logs/json_repair_failures") + error_dir.mkdir(parents=True, exist_ok=True) + self.error_log_dir = error_dir + self._failed_block_counter = 0 + self._active_run_id: Optional[str] = None + self._rescue_attempted_labels: Dict[str, Set[str]] = {} + self._skipped_placeholder_chapters: Set[str] = set() + self._archived_failed_json: Dict[str, str] = {} def run( self, @@ -141,6 +162,8 @@ class ChapterGenerationNode(BaseNode): "order": section.order, } chapter_dir = self.storage.begin_chapter(run_dir, chapter_meta) + run_id = run_dir.name + self._ensure_run_state(run_id) llm_payload = self._build_payload(section, context) user_message = build_chapter_user_prompt(llm_payload) @@ -151,7 +174,30 @@ class ChapterGenerationNode(BaseNode): section_meta=chapter_meta, **kwargs, ) - chapter_json = self._parse_chapter(raw_text) + parse_context: List[str] = [] + placeholder_created = False + try: + chapter_json = self._parse_chapter(raw_text) + except ChapterJsonParseError as parse_error: + logger.warning(f"{section.title} 章节JSON解析失败,尝试跨引擎修复: {parse_error}") + parse_context.append(str(parse_error)) + self._archive_failed_output(section, raw_text) + recovered = self._attempt_cross_engine_json_rescue( + section, + llm_payload, + raw_text, + run_id, + ) + if recovered: + chapter_json = recovered + logger.info(f"{section.title} 章节JSON已通过跨引擎修复") + else: + placeholder = self._build_placeholder_chapter(section, raw_text, parse_error) + if not placeholder: + raise + chapter_json, placeholder_notes = placeholder + parse_context.extend(placeholder_notes) + placeholder_created = True # 自动补全关键字段后再校验 chapter_json.setdefault("chapterId", section.chapter_id) @@ -176,13 +222,13 @@ class ChapterGenerationNode(BaseNode): self._sanitize_chapter_blocks(chapter_json) valid, errors = self.validator.validate_chapter(chapter_json) content_error: ChapterContentError | None = None - if valid: + if valid and not placeholder_created: try: self._ensure_content_density(chapter_json) except ChapterContentError as exc: content_error = exc - error_messages: List[str] = [] + error_messages: List[str] = parse_context.copy() if not valid and errors: error_messages.extend(errors) if content_error: @@ -314,6 +360,154 @@ class ChapterGenerationNode(BaseNode): logger.warning(f"章节流式回调失败: {callback_error}") return "".join(chunks) + def _attempt_cross_engine_json_rescue( + self, + section: TemplateSection, + generation_payload: Dict[str, Any], + raw_text: str, + run_id: str, + ) -> Optional[Dict[str, Any]]: + """ + 依次调用Report/Forum/Insight/Media四套API尝试修复无法解析的JSON。 + + Returns: + dict | None: 成功修复时返回章节JSON,否则为None。 + """ + if not self.fallback_llm_clients: + return None + if self._chapter_already_skipped(section): + logger.info(f"[{run_id}] {section.title} 已标记为占位,不再触发跨引擎修复") + return None + section_payload = { + "chapterId": section.chapter_id, + "title": section.title, + "slug": section.slug, + "order": section.order, + "number": section.number, + "outline": section.outline, + } + repair_prompt = build_chapter_recovery_payload( + section_payload, + generation_payload, + raw_text, + ) + attempted_labels = self._rescue_attempted_labels.setdefault(section.chapter_id, set()) + for label, client in self.fallback_llm_clients: + if label in attempted_labels: + continue + attempt_index = len(attempted_labels) + 1 + attempted_labels.add(label) + logger.info( + f"[{run_id}] 章节 {section.title} 触发 {label} API JSON抢修(第{attempt_index}次尝试)" + ) + try: + response = client.invoke( + SYSTEM_PROMPT_CHAPTER_JSON_RECOVERY, + repair_prompt, + temperature=0.0, + top_p=0.05, + ) + except Exception as exc: + logger.warning(f"{label} JSON修复调用失败: {exc}") + continue + if not response: + continue + try: + repaired = self._parse_chapter(response) + except Exception as exc: + logger.warning(f"{label} JSON修复输出仍无法解析: {exc}") + continue + logger.warning(f"[{run_id}] {label} API已修复章节JSON") + self._archived_failed_json.pop(section.chapter_id, None) + return repaired + return None + + def _ensure_run_state(self, run_id: str): + """确保每次报告运行时的修复状态隔离,防止上一份任务的记录影响新任务。""" + if self._active_run_id == run_id: + return + self._active_run_id = run_id + self._rescue_attempted_labels = {} + self._skipped_placeholder_chapters = set() + self._archived_failed_json = {} + + def _archive_failed_output(self, section: TemplateSection, raw_text: str): + """缓存当前章节的原始错误JSON,以便后续占位或人工使用。""" + if not raw_text: + return + self._archived_failed_json[section.chapter_id] = raw_text + + def _get_archived_failed_output(self, section: TemplateSection) -> Optional[str]: + """获取章节最近一次失败的原始输出。""" + return self._archived_failed_json.get(section.chapter_id) + + def _mark_chapter_skipped(self, section: TemplateSection): + """记录该章节已经降级为占位,避免重复触发跨引擎修复。""" + self._skipped_placeholder_chapters.add(section.chapter_id) + + def _chapter_already_skipped(self, section: TemplateSection) -> bool: + """判断章节是否已经被标记为占位。""" + return section.chapter_id in self._skipped_placeholder_chapters + + def _build_placeholder_chapter( + self, + section: TemplateSection, + raw_text: str, + parse_error: Exception, + ) -> Optional[Tuple[Dict[str, Any], List[str]]]: + """ + 在所有修复失败时构造可渲染的占位章节,并记录日志文件供后续排查。 + """ + snapshot = self._get_archived_failed_output(section) or raw_text + log_ref = self._persist_error_payload(section, snapshot, parse_error) + if not log_ref: + logger.error(f"{section.title} 章节JSON完全损坏且无法写入日志") + return None + importance = "critical" if self._is_section_critical(section) else "standard" + message = ( + f"LLM返回块解析错误,详情请见 {log_ref['relativeFile']} 的 {log_ref['entryId']} 记录。" + ) + heading_block = { + "type": "heading", + "level": 2 if importance == "critical" else 3, + "text": section.title, + "anchor": section.slug, + } + callout_block = { + "type": "callout", + "tone": "danger" if importance == "critical" else "warning", + "title": "LLM返回块解析错误", + "blocks": [ + { + "type": "paragraph", + "inlines": [ + { + "text": message, + } + ], + } + ], + "meta": { + "errorLogRef": log_ref, + "rawJsonPreview": (snapshot or "")[:2000], + "errorMessage": message, + "importance": importance, + }, + } + placeholder = { + "chapterId": section.chapter_id, + "title": section.title, + "anchor": section.slug, + "order": section.order, + "blocks": [heading_block, callout_block], + "errorPlaceholder": True, + } + errors = [ + f"{section.title} 章节JSON解析失败,已降级为占位。参考 {log_ref['relativeFile']}#{log_ref['entryId']}" + ] + self._mark_chapter_skipped(section) + return placeholder, errors + def _parse_chapter(self, raw_text: str) -> Dict[str, Any]: """ 清洗LLM输出并解析JSON。 @@ -375,6 +569,58 @@ class ChapterGenerationNode(BaseNode): return item raise ValueError("章节JSON缺少chapter字段") + def _persist_error_payload( + self, + section: TemplateSection, + raw_text: str, + parse_error: Exception, + ) -> Optional[Dict[str, str]]: + """将无法解析的JSON文本落盘,便于在HTML中指向具体文件。""" + try: + self._failed_block_counter += 1 + entry_id = f"E{self._failed_block_counter:04d}" + timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") + slug = section.slug or "section" + filename = f"{timestamp}-{slug}-{entry_id}.json" + file_path = self.error_log_dir / filename + payload = { + "chapterId": section.chapter_id, + "title": section.title, + "slug": section.slug, + "order": section.order, + "rawOutput": raw_text, + "error": str(parse_error), + "loggedAt": timestamp, + } + file_path.write_text( + json.dumps(payload, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + try: + relative_path = str(file_path.relative_to(Path.cwd())) + except ValueError: + relative_path = str(file_path) + return { + "file": str(file_path), + "relativeFile": relative_path, + "entryId": entry_id, + "timestamp": timestamp, + } + except Exception as exc: + logger.error(f"记录章节JSON错误日志失败: {exc}") + return None + + def _is_section_critical(self, section: TemplateSection) -> bool: + """基于章节深度/编号判断是否会影响目录,从而决定提示强度。""" + if not section: + return False + if section.depth <= 2: + return True + number = section.number or "" + if number and number.count(".") <= 1: + return True + return False + def _repair_llm_json(self, text: str) -> str: """ 处理常见的LLM错误(如":=导致的非法JSON)。 diff --git a/ReportEngine/prompts/__init__.py b/ReportEngine/prompts/__init__.py index 8f1f904..ba0e12c 100644 --- a/ReportEngine/prompts/__init__.py +++ b/ReportEngine/prompts/__init__.py @@ -9,6 +9,7 @@ from .prompts import ( SYSTEM_PROMPT_HTML_GENERATION, SYSTEM_PROMPT_CHAPTER_JSON, SYSTEM_PROMPT_CHAPTER_JSON_REPAIR, + SYSTEM_PROMPT_CHAPTER_JSON_RECOVERY, SYSTEM_PROMPT_DOCUMENT_LAYOUT, SYSTEM_PROMPT_WORD_BUDGET, output_schema_template_selection, @@ -16,6 +17,7 @@ from .prompts import ( chapter_generation_input_schema, build_chapter_user_prompt, build_chapter_repair_prompt, + build_chapter_recovery_payload, build_document_layout_prompt, build_word_budget_prompt, ) @@ -27,11 +29,13 @@ __all__ = [ "SYSTEM_PROMPT_CHAPTER_JSON_REPAIR", "SYSTEM_PROMPT_DOCUMENT_LAYOUT", "SYSTEM_PROMPT_WORD_BUDGET", + "SYSTEM_PROMPT_CHAPTER_JSON_RECOVERY", "output_schema_template_selection", "input_schema_html_generation", "chapter_generation_input_schema", "build_chapter_user_prompt", "build_chapter_repair_prompt", + "build_chapter_recovery_payload", "build_document_layout_prompt", "build_word_budget_prompt", ] diff --git a/ReportEngine/prompts/prompts.py b/ReportEngine/prompts/prompts.py index f4a3582..eb2cac0 100644 --- a/ReportEngine/prompts/prompts.py +++ b/ReportEngine/prompts/prompts.py @@ -335,6 +335,24 @@ SYSTEM_PROMPT_CHAPTER_JSON_REPAIR = f""" 只返回JSON,不要添加注释或自然语言。 """ +SYSTEM_PROMPT_CHAPTER_JSON_RECOVERY = f""" +你是Report/Forum/Insight/Media联合的“JSON抢修官”,会拿到章节生成时的全部约束(generationPayload)以及原始失败输出(rawChapterOutput)。 + +请遵守: +1. 章节必须满足IR版本 {IR_VERSION} 规范,block.type 仅能使用:{', '.join(ALLOWED_BLOCK_TYPES)}; +2. paragraph.inlines中的marks仅可出现:{', '.join(ALLOWED_INLINE_MARKS)},并保留原始文字顺序; +3. 请以 generationPayload 中的 section 信息为主导,heading.text 与 anchor 必须与章节slug保持一致; +4. 仅对JSON语法/字段/嵌套做最小必要修复,不改写事实与结论; +5. 输出严格遵循 {{\"chapter\": {{...}}}} 格式,不添加说明。 + +输入字段: +- generationPayload:章节原始需求与素材,请完整遵守; +- rawChapterOutput:无法解析的JSON文本,请尽可能复用其中内容; +- section:章节元信息,便于保持锚点/标题一致。 + +请直接返回修复后的JSON。 +""" + # 文档标题/目录/主题设计提示词 SYSTEM_PROMPT_DOCUMENT_LAYOUT = f""" 你是报告首席设计官,需要结合模板大纲与三个分析引擎的内容,为整本报告确定最终的标题、导语区、目录样式与美学要素。 @@ -399,6 +417,22 @@ def build_chapter_repair_prompt(chapter: dict, errors, original_text=None) -> st return json.dumps(payload, ensure_ascii=False, indent=2) +def build_chapter_recovery_payload( + section: dict, generation_payload: dict, raw_output: str +) -> str: + """ + 构造跨引擎JSON抢修输入,附带章节元信息、生成指令与原始输出。 + + 为避免提示词过长,仅保留原始输出的尾部片段以定位问题。 + """ + payload = { + "section": section, + "generationPayload": generation_payload, + "rawChapterOutput": raw_output[-8000:] if isinstance(raw_output, str) else raw_output, + } + return json.dumps(payload, ensure_ascii=False, indent=2) + + def build_document_layout_prompt(payload: dict) -> str: """将文档设计所需的上下文序列化为JSON字符串,供布局节点发送给LLM。""" return json.dumps(payload, ensure_ascii=False, indent=2) diff --git a/ReportEngine/utils/config.py b/ReportEngine/utils/config.py index b7dac45..a062256 100644 --- a/ReportEngine/utils/config.py +++ b/ReportEngine/utils/config.py @@ -15,6 +15,34 @@ class Settings(BaseSettings): REPORT_ENGINE_BASE_URL: Optional[str] = Field(None, description="Report Engine LLM基础URL") REPORT_ENGINE_MODEL_NAME: Optional[str] = Field(None, description="Report Engine LLM模型名称") REPORT_ENGINE_PROVIDER: Optional[str] = Field(None, description="模型服务商,仅兼容保留") + # 其他引擎API(用于跨引擎修复) + FORUM_HOST_API_KEY: Optional[str] = Field( + None, description="Forum Engine / Forum Host 的LLM API密钥(用于章节修复兜底)" + ) + FORUM_HOST_BASE_URL: Optional[str] = Field( + None, description="Forum Engine API Base URL(为空则使用LLM默认配置)" + ) + FORUM_HOST_MODEL_NAME: Optional[str] = Field( + None, description="Forum Engine LLM模型名称" + ) + INSIGHT_ENGINE_API_KEY: Optional[str] = Field( + None, description="Insight Engine LLM API密钥,用于跨引擎章节修复" + ) + INSIGHT_ENGINE_BASE_URL: Optional[str] = Field( + None, description="Insight Engine API Base URL" + ) + INSIGHT_ENGINE_MODEL_NAME: Optional[str] = Field( + None, description="Insight Engine LLM模型名称" + ) + MEDIA_ENGINE_API_KEY: Optional[str] = Field( + None, description="Media Engine LLM API密钥,用于跨引擎章节修复" + ) + MEDIA_ENGINE_BASE_URL: Optional[str] = Field( + None, description="Media Engine API Base URL" + ) + MEDIA_ENGINE_MODEL_NAME: Optional[str] = Field( + None, description="Media Engine LLM模型名称" + ) MAX_CONTENT_LENGTH: int = Field(200000, description="最大内容长度") OUTPUT_DIR: str = Field("final_reports", description="主输出目录") # 章节分块JSON会存储在该目录,便于溯源与断点续传 @@ -35,6 +63,9 @@ class Settings(BaseSettings): LOG_FILE: str = Field("logs/report.log", description="日志输出文件") ENABLE_PDF_EXPORT: bool = Field(True, description="是否允许导出PDF") CHART_STYLE: str = Field("modern", description="图表样式:modern/classic/") + JSON_ERROR_LOG_DIR: str = Field( + "logs/json_repair_failures", description="无法修复的JSON块落盘目录" + ) class Config: """Pydantic配置:允许从.env读取并兼容大小写"""