Blocked HTML

This commit is contained in:
马一丁
2025-11-13 10:56:28 +08:00
parent 403dbbd296
commit 4846b1f758
20 changed files with 3660 additions and 367 deletions
+16
View File
@@ -0,0 +1,16 @@
"""
Report Engine核心工具集合。
包含模板切片、章节存储等基础能力,供agent流水线复用。
"""
from .template_parser import TemplateSection, parse_template_sections
from .chapter_storage import ChapterStorage
from .stitcher import DocumentComposer
__all__ = [
"TemplateSection",
"parse_template_sections",
"ChapterStorage",
"DocumentComposer",
]
+209
View File
@@ -0,0 +1,209 @@
"""
章节JSON的落盘与清单管理。
每一章在流式生成时会立即写入raw文件,完成校验后再写入
格式化的chapter.json,并在manifest中记录元数据,便于后续装订。
"""
from __future__ import annotations
import json
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, Generator, List, Optional
@dataclass
class ChapterRecord:
"""manifest中记录的章节元数据"""
chapter_id: str
slug: str
title: str
order: int
status: str
files: Dict[str, str] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
updated_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
def to_dict(self) -> Dict[str, object]:
return {
"chapterId": self.chapter_id,
"slug": self.slug,
"title": self.title,
"order": self.order,
"status": self.status,
"files": self.files,
"errors": self.errors,
"updatedAt": self.updated_at,
}
class ChapterStorage:
"""
章节JSON写入与manifest管理器。
用法:
run_dir = storage.start_session(report_id, {...})
chapter_dir = storage.begin_chapter(run_dir, meta)
with storage.capture_stream(chapter_dir) as fp:
fp.write(chunk)
storage.persist_chapter(run_dir, meta, payload, errors)
"""
def __init__(self, base_dir: str):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
self._manifests: Dict[str, Dict[str, object]] = {}
# ======== 会话 & manifest ========
def start_session(self, report_id: str, metadata: Dict[str, object]) -> Path:
"""为本次报告创建独立的章节输出目录与manifest"""
run_dir = self.base_dir / report_id
run_dir.mkdir(parents=True, exist_ok=True)
manifest = {
"reportId": report_id,
"createdAt": datetime.utcnow().isoformat() + "Z",
"metadata": metadata,
"chapters": [],
}
self._manifests[self._key(run_dir)] = manifest
self._write_manifest(run_dir, manifest)
return run_dir
def begin_chapter(self, run_dir: Path, chapter_meta: Dict[str, object]) -> Path:
"""创建章节子目录并在manifest中标记为streaming状态"""
slug_value = str(
chapter_meta.get("slug") or chapter_meta.get("chapterId") or "section"
)
chapter_dir = self._chapter_dir(
run_dir,
slug_value,
int(chapter_meta.get("order", 0)),
)
record = ChapterRecord(
chapter_id=str(chapter_meta.get("chapterId")),
slug=slug_value,
title=str(chapter_meta.get("title")),
order=int(chapter_meta.get("order", 0)),
status="streaming",
files={"raw": str(self._raw_stream_path(chapter_dir).relative_to(run_dir))},
)
self._upsert_record(run_dir, record)
return chapter_dir
def persist_chapter(
self,
run_dir: Path,
chapter_meta: Dict[str, object],
payload: Dict[str, object],
errors: Optional[List[str]] = None,
) -> Path:
"""章节流式生成完毕后写入最终JSON并更新manifest状态"""
slug_value = str(
chapter_meta.get("slug") or chapter_meta.get("chapterId") or "section"
)
chapter_dir = self._chapter_dir(
run_dir,
slug_value,
int(chapter_meta.get("order", 0)),
)
final_path = chapter_dir / "chapter.json"
final_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
record = ChapterRecord(
chapter_id=str(chapter_meta.get("chapterId")),
slug=slug_value,
title=str(chapter_meta.get("title")),
order=int(chapter_meta.get("order", 0)),
status="ready" if not errors else "invalid",
files={
"raw": str(self._raw_stream_path(chapter_dir).relative_to(run_dir)),
"json": str(final_path.relative_to(run_dir)),
},
errors=errors or [],
)
self._upsert_record(run_dir, record)
return final_path
def load_chapters(self, run_dir: Path) -> List[Dict[str, object]]:
payloads: List[Dict[str, object]] = []
for child in sorted(run_dir.iterdir()):
if not child.is_dir():
continue
chapter_path = child / "chapter.json"
if not chapter_path.exists():
continue
try:
payload = json.loads(chapter_path.read_text(encoding="utf-8"))
payloads.append(payload)
except json.JSONDecodeError:
continue
payloads.sort(key=lambda x: x.get("order", 0))
return payloads
# ======== 文件操作 ========
@contextmanager
def capture_stream(self, chapter_dir: Path) -> Generator:
"""将流式输出实时写入raw文件"""
raw_path = self._raw_stream_path(chapter_dir)
raw_path.parent.mkdir(parents=True, exist_ok=True)
with raw_path.open("w", encoding="utf-8") as fp:
yield fp
# ======== 内部工具 ========
def _chapter_dir(self, run_dir: Path, slug: str, order: int) -> Path:
safe_slug = self._safe_slug(slug)
folder = f"{order:03d}-{safe_slug}"
path = run_dir / folder
path.mkdir(parents=True, exist_ok=True)
return path
def _safe_slug(self, slug: str) -> str:
slug = slug.replace(" ", "-").replace("/", "-")
return slug or "section"
def _raw_stream_path(self, chapter_dir: Path) -> Path:
return chapter_dir / "stream.raw"
def _key(self, run_dir: Path) -> str:
return str(run_dir.resolve())
def _manifest_path(self, run_dir: Path) -> Path:
return run_dir / "manifest.json"
def _write_manifest(self, run_dir: Path, manifest: Dict[str, object]):
self._manifest_path(run_dir).write_text(
json.dumps(manifest, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _read_manifest(self, run_dir: Path) -> Dict[str, object]:
manifest_path = self._manifest_path(run_dir)
if manifest_path.exists():
return json.loads(manifest_path.read_text(encoding="utf-8"))
return {"reportId": run_dir.name, "chapters": []}
def _upsert_record(self, run_dir: Path, record: ChapterRecord):
"""更新或追加manifest中的章节记录,保证顺序一致"""
key = self._key(run_dir)
manifest = self._manifests.get(key) or self._read_manifest(run_dir)
chapters: List[Dict[str, object]] = manifest.get("chapters", [])
chapters = [c for c in chapters if c.get("chapterId") != record.chapter_id]
chapters.append(record.to_dict())
chapters.sort(key=lambda x: x.get("order", 0))
manifest["chapters"] = chapters
manifest.setdefault("updatedAt", datetime.utcnow().isoformat() + "Z")
self._manifests[key] = manifest
self._write_manifest(run_dir, manifest)
__all__ = ["ChapterStorage", "ChapterRecord"]
+60
View File
@@ -0,0 +1,60 @@
"""
章节装订器:负责把多个章节JSON合并为整本IR。
"""
from __future__ import annotations
from datetime import datetime
from typing import Dict, List, Set
from ..ir import IR_VERSION
class DocumentComposer:
"""
将章节拼接成Document IR的简单装订器。
"""
def __init__(self):
self._seen_anchors: Set[str] = set()
def build_document(
self,
report_id: str,
metadata: Dict[str, object],
chapters: List[Dict[str, object]],
) -> Dict[str, object]:
"""把所有章节按order排序并注入唯一锚点,形成整本IR"""
ordered = sorted(chapters, key=lambda c: c.get("order", 0))
for idx, chapter in enumerate(ordered, start=1):
chapter.setdefault("chapterId", f"S{idx}")
anchor = chapter.get("anchor") or f"section-{idx}"
chapter["anchor"] = self._ensure_unique_anchor(anchor)
chapter.setdefault("order", idx * 10)
document = {
"version": IR_VERSION,
"reportId": report_id,
"metadata": {
**metadata,
"generatedAt": metadata.get("generatedAt")
or datetime.utcnow().isoformat() + "Z",
},
"themeTokens": metadata.get("themeTokens", {}),
"chapters": ordered,
"assets": metadata.get("assets", {}),
}
return document
def _ensure_unique_anchor(self, anchor: str) -> str:
"""若存在重复锚点则追加序号,确保全局唯一"""
base = anchor
counter = 2
while anchor in self._seen_anchors:
anchor = f"{base}-{counter}"
counter += 1
self._seen_anchors.add(anchor)
return anchor
__all__ = ["DocumentComposer"]
+208
View File
@@ -0,0 +1,208 @@
"""
Markdown模板切片工具。
LLM需要“按章调用”,因此必须把Markdown模板解析为结构化章节队列。
这里通过轻量正则和缩进启发式,兼容“# 标题”与
“- **1.0 标题** / - 1.1 子标题”等多种写法。
"""
from __future__ import annotations
import re
import unicodedata
from dataclasses import dataclass, field
from typing import List, Optional
SECTION_ORDER_STEP = 10
@dataclass
class TemplateSection:
"""模板章节实体"""
title: str
slug: str
order: int
depth: int
raw_title: str
number: str = ""
chapter_id: str = ""
outline: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"title": self.title,
"slug": self.slug,
"order": self.order,
"depth": self.depth,
"number": self.number,
"chapterId": self.chapter_id,
"outline": self.outline,
}
heading_pattern = re.compile(r"^(#{1,6})\s+(.*)$")
bullet_pattern = re.compile(r"^[-*+]\s+(.*)$")
number_pattern = re.compile(r"^(?P<num>\d+(?:\.\d+)*)(?:[\s、:.-]+(?P<label>.*))?$")
def parse_template_sections(template_md: str) -> List[TemplateSection]:
"""
将Markdown模板切分成章节列表(按大标题)。
返回的每个TemplateSection都携带slug/order/章节号,
方便后续分章调用与锚点生成。
"""
sections: List[TemplateSection] = []
current: Optional[TemplateSection] = None
order = SECTION_ORDER_STEP
used_slugs = set()
for raw_line in template_md.splitlines():
if not raw_line.strip():
continue
indent = len(raw_line) - len(raw_line.lstrip(" "))
stripped = raw_line.strip()
meta = _classify_line(stripped, indent)
if not meta:
continue
if meta["is_section"]:
slug = _ensure_unique_slug(meta["slug"], used_slugs)
section = TemplateSection(
title=meta["title"],
slug=slug,
order=order,
depth=meta["depth"],
raw_title=meta["raw"],
number=meta["number"],
)
sections.append(section)
current = section
order += SECTION_ORDER_STEP
continue
# outline
if current:
current.outline.append(meta["title"])
for idx, section in enumerate(sections, start=1):
# 为每个章节生成稳定的chapter_id,便于后续引用
section.chapter_id = f"S{idx}"
return sections
def _classify_line(stripped: str, indent: int) -> Optional[dict]:
"""根据缩进与符号分类行"""
heading_match = heading_pattern.match(stripped)
if heading_match:
level = len(heading_match.group(1))
payload = _strip_markup(heading_match.group(2).strip())
title_info = _split_number(payload)
slug = _build_slug(title_info["number"], title_info["title"])
return {
"is_section": level <= 2,
"depth": level,
"title": title_info["display"],
"raw": payload,
"number": title_info["number"],
"slug": slug,
}
bullet_match = bullet_pattern.match(stripped)
if bullet_match:
payload = _strip_markup(bullet_match.group(1).strip())
title_info = _split_number(payload)
slug = _build_slug(title_info["number"], title_info["title"])
is_section = indent <= 1
depth = 1 if indent <= 1 else 2
return {
"is_section": is_section,
"depth": depth,
"title": title_info["display"],
"raw": payload,
"number": title_info["number"],
"slug": slug,
}
# 兼容“1.1 ...”没有前缀符号的行
number_match = number_pattern.match(stripped)
if number_match and number_match.group("label"):
payload = stripped
title = number_match.group("label").strip()
number = number_match.group("num")
slug = _build_slug(number, title)
is_section = indent == 0 and number.count(".") <= 1
depth = 1 if is_section else 2
display = f"{number} {title}" if title else number
return {
"is_section": is_section,
"depth": depth,
"title": display,
"raw": payload,
"number": number,
"slug": slug,
}
return None
def _strip_markup(text: str) -> str:
"""去除包裹的**、__等简单强调标记"""
if text.startswith(("**", "__")) and text.endswith(("**", "__")) and len(text) > 4:
return text[2:-2].strip()
return text
def _split_number(payload: str) -> dict:
"""拆分编号与标题"""
match = number_pattern.match(payload)
number = match.group("num") if match else ""
label = match.group("label") if match else payload
label = (label or "").strip()
display = f"{number} {label}".strip() if number else label or payload
title_core = label or payload
return {
"number": number,
"title": title_core,
"display": display,
}
def _build_slug(number: str, title: str) -> str:
"""根据编号/标题生成锚点"""
if number:
token = number.replace(".", "-")
else:
token = _slugify_text(title)
token = token or "section"
return f"section-{token}"
def _slugify_text(text: str) -> str:
text = unicodedata.normalize("NFKD", text)
text = text.replace("·", "-").replace(" ", "-")
text = re.sub(r"[^0-9a-zA-Z\u4e00-\u9fff-]+", "-", text)
text = re.sub(r"-{2,}", "-", text)
return text.strip("-").lower()
def _ensure_unique_slug(slug: str, used: set) -> str:
if slug not in used:
used.add(slug)
return slug
base = slug
idx = 2
while slug in used:
slug = f"{base}-{idx}"
idx += 1
used.add(slug)
return slug
__all__ = ["TemplateSection", "parse_template_sections"]