Enhance Repair Capabilities

This commit is contained in:
马一丁
2025-11-15 15:22:31 +08:00
parent fa1ebc07ec
commit 90d12a092d
5 changed files with 365 additions and 7 deletions
+251 -5
View File
@@ -8,9 +8,10 @@
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
import re
from typing import Any, Dict, List, Tuple, Callable, Optional
from typing import Any, Dict, List, Tuple, Callable, Optional, Set
from loguru import logger
@@ -19,7 +20,9 @@ from ..ir import ALLOWED_BLOCK_TYPES, ALLOWED_INLINE_MARKS, IRValidator
from ..prompts import (
SYSTEM_PROMPT_CHAPTER_JSON,
SYSTEM_PROMPT_CHAPTER_JSON_REPAIR,
SYSTEM_PROMPT_CHAPTER_JSON_RECOVERY,
build_chapter_repair_prompt,
build_chapter_recovery_payload,
build_chapter_user_prompt,
)
from .base_node import BaseNode
@@ -96,7 +99,14 @@ class ChapterGenerationNode(BaseNode):
_PARAGRAPH_FRAGMENT_NO_TERMINATOR_MAX_CHARS = 240
_TERMINATION_PUNCTUATION = set("。!?!?;……")
def __init__(self, llm_client, validator: IRValidator, storage: ChapterStorage):
def __init__(
self,
llm_client,
validator: IRValidator,
storage: ChapterStorage,
fallback_llm_clients: Optional[List[Tuple[str, Any]]] = None,
error_log_dir: Optional[str | Path] = None,
):
"""
记录LLM客户端/校验器/章节存储器,便于run方法调度。
@@ -108,6 +118,17 @@ class ChapterGenerationNode(BaseNode):
super().__init__(llm_client, "ChapterGenerationNode")
self.validator = validator
self.storage = storage
self.fallback_llm_clients: List[Tuple[str, Any]] = fallback_llm_clients or [
("report_engine", llm_client)
]
error_dir = Path(error_log_dir or "logs/json_repair_failures")
error_dir.mkdir(parents=True, exist_ok=True)
self.error_log_dir = error_dir
self._failed_block_counter = 0
self._active_run_id: Optional[str] = None
self._rescue_attempted_labels: Dict[str, Set[str]] = {}
self._skipped_placeholder_chapters: Set[str] = set()
self._archived_failed_json: Dict[str, str] = {}
def run(
self,
@@ -141,6 +162,8 @@ class ChapterGenerationNode(BaseNode):
"order": section.order,
}
chapter_dir = self.storage.begin_chapter(run_dir, chapter_meta)
run_id = run_dir.name
self._ensure_run_state(run_id)
llm_payload = self._build_payload(section, context)
user_message = build_chapter_user_prompt(llm_payload)
@@ -151,7 +174,30 @@ class ChapterGenerationNode(BaseNode):
section_meta=chapter_meta,
**kwargs,
)
chapter_json = self._parse_chapter(raw_text)
parse_context: List[str] = []
placeholder_created = False
try:
chapter_json = self._parse_chapter(raw_text)
except ChapterJsonParseError as parse_error:
logger.warning(f"{section.title} 章节JSON解析失败,尝试跨引擎修复: {parse_error}")
parse_context.append(str(parse_error))
self._archive_failed_output(section, raw_text)
recovered = self._attempt_cross_engine_json_rescue(
section,
llm_payload,
raw_text,
run_id,
)
if recovered:
chapter_json = recovered
logger.info(f"{section.title} 章节JSON已通过跨引擎修复")
else:
placeholder = self._build_placeholder_chapter(section, raw_text, parse_error)
if not placeholder:
raise
chapter_json, placeholder_notes = placeholder
parse_context.extend(placeholder_notes)
placeholder_created = True
# 自动补全关键字段后再校验
chapter_json.setdefault("chapterId", section.chapter_id)
@@ -176,13 +222,13 @@ class ChapterGenerationNode(BaseNode):
self._sanitize_chapter_blocks(chapter_json)
valid, errors = self.validator.validate_chapter(chapter_json)
content_error: ChapterContentError | None = None
if valid:
if valid and not placeholder_created:
try:
self._ensure_content_density(chapter_json)
except ChapterContentError as exc:
content_error = exc
error_messages: List[str] = []
error_messages: List[str] = parse_context.copy()
if not valid and errors:
error_messages.extend(errors)
if content_error:
@@ -314,6 +360,154 @@ class ChapterGenerationNode(BaseNode):
logger.warning(f"章节流式回调失败: {callback_error}")
return "".join(chunks)
def _attempt_cross_engine_json_rescue(
self,
section: TemplateSection,
generation_payload: Dict[str, Any],
raw_text: str,
run_id: str,
) -> Optional[Dict[str, Any]]:
"""
依次调用Report/Forum/Insight/Media四套API尝试修复无法解析的JSON。
Returns:
dict | None: 成功修复时返回章节JSON,否则为None。
"""
if not self.fallback_llm_clients:
return None
if self._chapter_already_skipped(section):
logger.info(f"[{run_id}] {section.title} 已标记为占位,不再触发跨引擎修复")
return None
section_payload = {
"chapterId": section.chapter_id,
"title": section.title,
"slug": section.slug,
"order": section.order,
"number": section.number,
"outline": section.outline,
}
repair_prompt = build_chapter_recovery_payload(
section_payload,
generation_payload,
raw_text,
)
attempted_labels = self._rescue_attempted_labels.setdefault(section.chapter_id, set())
for label, client in self.fallback_llm_clients:
if label in attempted_labels:
continue
attempt_index = len(attempted_labels) + 1
attempted_labels.add(label)
logger.info(
f"[{run_id}] 章节 {section.title} 触发 {label} API JSON抢修(第{attempt_index}次尝试)"
)
try:
response = client.invoke(
SYSTEM_PROMPT_CHAPTER_JSON_RECOVERY,
repair_prompt,
temperature=0.0,
top_p=0.05,
)
except Exception as exc:
logger.warning(f"{label} JSON修复调用失败: {exc}")
continue
if not response:
continue
try:
repaired = self._parse_chapter(response)
except Exception as exc:
logger.warning(f"{label} JSON修复输出仍无法解析: {exc}")
continue
logger.warning(f"[{run_id}] {label} API已修复章节JSON")
self._archived_failed_json.pop(section.chapter_id, None)
return repaired
return None
def _ensure_run_state(self, run_id: str):
"""确保每次报告运行时的修复状态隔离,防止上一份任务的记录影响新任务。"""
if self._active_run_id == run_id:
return
self._active_run_id = run_id
self._rescue_attempted_labels = {}
self._skipped_placeholder_chapters = set()
self._archived_failed_json = {}
def _archive_failed_output(self, section: TemplateSection, raw_text: str):
"""缓存当前章节的原始错误JSON,以便后续占位或人工使用。"""
if not raw_text:
return
self._archived_failed_json[section.chapter_id] = raw_text
def _get_archived_failed_output(self, section: TemplateSection) -> Optional[str]:
"""获取章节最近一次失败的原始输出。"""
return self._archived_failed_json.get(section.chapter_id)
def _mark_chapter_skipped(self, section: TemplateSection):
"""记录该章节已经降级为占位,避免重复触发跨引擎修复。"""
self._skipped_placeholder_chapters.add(section.chapter_id)
def _chapter_already_skipped(self, section: TemplateSection) -> bool:
"""判断章节是否已经被标记为占位。"""
return section.chapter_id in self._skipped_placeholder_chapters
def _build_placeholder_chapter(
self,
section: TemplateSection,
raw_text: str,
parse_error: Exception,
) -> Optional[Tuple[Dict[str, Any], List[str]]]:
"""
在所有修复失败时构造可渲染的占位章节,并记录日志文件供后续排查。
"""
snapshot = self._get_archived_failed_output(section) or raw_text
log_ref = self._persist_error_payload(section, snapshot, parse_error)
if not log_ref:
logger.error(f"{section.title} 章节JSON完全损坏且无法写入日志")
return None
importance = "critical" if self._is_section_critical(section) else "standard"
message = (
f"LLM返回块解析错误,详情请见 {log_ref['relativeFile']}{log_ref['entryId']} 记录。"
)
heading_block = {
"type": "heading",
"level": 2 if importance == "critical" else 3,
"text": section.title,
"anchor": section.slug,
}
callout_block = {
"type": "callout",
"tone": "danger" if importance == "critical" else "warning",
"title": "LLM返回块解析错误",
"blocks": [
{
"type": "paragraph",
"inlines": [
{
"text": message,
}
],
}
],
"meta": {
"errorLogRef": log_ref,
"rawJsonPreview": (snapshot or "")[:2000],
"errorMessage": message,
"importance": importance,
},
}
placeholder = {
"chapterId": section.chapter_id,
"title": section.title,
"anchor": section.slug,
"order": section.order,
"blocks": [heading_block, callout_block],
"errorPlaceholder": True,
}
errors = [
f"{section.title} 章节JSON解析失败,已降级为占位。参考 {log_ref['relativeFile']}#{log_ref['entryId']}"
]
self._mark_chapter_skipped(section)
return placeholder, errors
def _parse_chapter(self, raw_text: str) -> Dict[str, Any]:
"""
清洗LLM输出并解析JSON。
@@ -375,6 +569,58 @@ class ChapterGenerationNode(BaseNode):
return item
raise ValueError("章节JSON缺少chapter字段")
def _persist_error_payload(
self,
section: TemplateSection,
raw_text: str,
parse_error: Exception,
) -> Optional[Dict[str, str]]:
"""将无法解析的JSON文本落盘,便于在HTML中指向具体文件。"""
try:
self._failed_block_counter += 1
entry_id = f"E{self._failed_block_counter:04d}"
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
slug = section.slug or "section"
filename = f"{timestamp}-{slug}-{entry_id}.json"
file_path = self.error_log_dir / filename
payload = {
"chapterId": section.chapter_id,
"title": section.title,
"slug": section.slug,
"order": section.order,
"rawOutput": raw_text,
"error": str(parse_error),
"loggedAt": timestamp,
}
file_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
try:
relative_path = str(file_path.relative_to(Path.cwd()))
except ValueError:
relative_path = str(file_path)
return {
"file": str(file_path),
"relativeFile": relative_path,
"entryId": entry_id,
"timestamp": timestamp,
}
except Exception as exc:
logger.error(f"记录章节JSON错误日志失败: {exc}")
return None
def _is_section_critical(self, section: TemplateSection) -> bool:
"""基于章节深度/编号判断是否会影响目录,从而决定提示强度。"""
if not section:
return False
if section.depth <= 2:
return True
number = section.number or ""
if number and number.count(".") <= 1:
return True
return False
def _repair_llm_json(self, text: str) -> str:
"""
处理常见的LLM错误(如":=导致的非法JSON)。