""".strip()
def _render_tagline(self) -> str:
"""
渲染标题下方的标语,如无标语则返回空字符串。
返回:
str: tagline HTML或空串。
"""
tagline = self.metadata.get("tagline")
if not tagline:
return ""
return f'
{self._escape_html(tagline)}
'
def _render_cover(self) -> str:
"""
文章开头的封面区,居中展示标题与“文章总览”提示。
返回:
str: cover section HTML。
"""
title = self.metadata.get("title") or "智能舆情报告"
subtitle = self.metadata.get("subtitle") or self.metadata.get("templateName") or ""
overview_hint = "文章总览"
return f"""
{overview_hint}
{self._escape_html(title)}
{self._escape_html(subtitle)}
""".strip()
def _render_hero(self) -> str:
"""
根据layout中的hero字段输出摘要/KPI/亮点区。
返回:
str: hero区HTML,若无数据则为空字符串。
"""
hero = self.metadata.get("hero") or {}
if not hero:
return ""
summary = hero.get("summary")
summary_html = f'
{self._escape_html(summary)}
' if summary else ""
highlights = hero.get("highlights") or []
highlight_html = "".join(
f'
{self._escape_html(text)}
'
for text in highlights
)
actions = hero.get("actions") or []
actions_html = "".join(
f''
for text in actions
)
kpi_cards = ""
for item in hero.get("kpis", []):
delta = item.get("delta")
tone = item.get("tone") or "neutral"
delta_html = f'{self._escape_html(delta)}' if delta else ""
kpi_cards += f"""
{self._escape_html(item.get("label"))}
{self._escape_html(item.get("value"))}
{delta_html}
"""
return f"""
{summary_html}
{highlight_html}
{actions_html}
{kpi_cards}
""".strip()
def _render_meta_panel(self) -> str:
"""当前需求不展示元信息,保留方法便于后续扩展"""
return ""
def _render_toc_section(self) -> str:
"""
生成目录模块,如无目录数据则返回空字符串。
返回:
str: toc HTML结构。
"""
if not self.toc_entries:
return ""
if self.toc_rendered:
return ""
toc_config = self.metadata.get("toc") or {}
toc_title = toc_config.get("title") or "📚 目录"
toc_items = "".join(
self._format_toc_entry(entry)
for entry in self.toc_entries
)
self.toc_rendered = True
return f"""
""".strip()
def _collect_toc_entries(self, chapters: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
根据metadata中的tocPlan或章节heading收集目录项。
参数:
chapters: Document IR中的章节数组。
返回:
list[dict]: 规范化后的目录条目,包含level/text/anchor。
"""
metadata = self.metadata
toc_config = metadata.get("toc") or {}
custom_entries = toc_config.get("customEntries")
entries: List[Dict[str, Any]] = []
if custom_entries:
for entry in custom_entries:
anchor = entry.get("anchor") or self.chapter_anchor_map.get(entry.get("chapterId"))
if not anchor:
continue
entries.append(
{
"level": entry.get("level", 2),
"text": entry.get("display") or entry.get("title") or "",
"anchor": anchor,
"description": entry.get("description"),
}
)
return entries
for chapter in chapters or []:
for block in chapter.get("blocks", []):
if block.get("type") == "heading":
anchor = block.get("anchor") or chapter.get("anchor") or ""
if not anchor:
continue
mapped = self.heading_label_map.get(anchor, {})
entries.append(
{
"level": block.get("level", 2),
"text": mapped.get("display") or block.get("text", ""),
"anchor": anchor,
"description": mapped.get("description"),
}
)
return entries
def _prepare_chapters(self, chapters: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""复制章节并展开其中序列化的block,避免渲染缺失"""
prepared: List[Dict[str, Any]] = []
for chapter in chapters or []:
chapter_copy = copy.deepcopy(chapter)
chapter_copy["blocks"] = self._expand_blocks_in_place(chapter_copy.get("blocks", []))
prepared.append(chapter_copy)
return prepared
def _expand_blocks_in_place(self, blocks: List[Dict[str, Any]] | None) -> List[Dict[str, Any]]:
"""遍历block列表,将内嵌JSON串拆解为独立block"""
expanded: List[Dict[str, Any]] = []
for block in blocks or []:
extras = self._extract_embedded_blocks(block)
expanded.append(block)
if extras:
expanded.extend(self._expand_blocks_in_place(extras))
return expanded
def _extract_embedded_blocks(self, block: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
在block内部查找被误写成字符串的block列表,并返回补充的block
"""
extracted: List[Dict[str, Any]] = []
def traverse(node: Any) -> None:
"""递归遍历block树,识别text字段内潜在的嵌套block JSON"""
if isinstance(node, dict):
for key, value in list(node.items()):
if key == "text" and isinstance(value, str):
decoded = self._decode_embedded_block_payload(value)
if decoded:
node[key] = ""
extracted.extend(decoded)
continue
traverse(value)
elif isinstance(node, list):
for item in node:
traverse(item)
traverse(block)
return extracted
def _decode_embedded_block_payload(self, raw: str) -> List[Dict[str, Any]] | None:
"""
将字符串形式的block描述恢复为结构化列表。
"""
if not isinstance(raw, str):
return None
stripped = raw.strip()
if not stripped or stripped[0] not in "{[":
return None
payload: Any | None = None
decode_targets = [stripped]
if stripped and stripped[0] != "[":
decode_targets.append(f"[{stripped}]")
for candidate in decode_targets:
try:
payload = json.loads(candidate)
break
except json.JSONDecodeError:
continue
if payload is None:
for candidate in decode_targets:
try:
payload = ast.literal_eval(candidate)
break
except (ValueError, SyntaxError):
continue
if payload is None:
return None
blocks = self._collect_blocks_from_payload(payload)
return blocks or None
@staticmethod
def _looks_like_block(payload: Dict[str, Any]) -> bool:
"""粗略判断dict是否符合block结构"""
if not isinstance(payload, dict):
return False
if "type" in payload and isinstance(payload["type"], str):
return True
structural_keys = {"blocks", "rows", "items", "widgetId", "widgetType", "data"}
return any(key in payload for key in structural_keys)
def _collect_blocks_from_payload(self, payload: Any) -> List[Dict[str, Any]]:
"""递归收集payload中的block节点"""
collected: List[Dict[str, Any]] = []
if isinstance(payload, dict):
block_list = payload.get("blocks")
block_type = payload.get("type")
if isinstance(block_list, list) and not block_type:
for candidate in block_list:
collected.extend(self._collect_blocks_from_payload(candidate))
return collected
if payload.get("cells") and not block_type:
for cell in payload["cells"]:
collected.extend(self._collect_blocks_from_payload(cell.get("blocks")))
return collected
if payload.get("items") and not block_type:
for item in payload["items"]:
collected.extend(self._collect_blocks_from_payload(item))
return collected
appended = False
if block_type or payload.get("widgetId") or payload.get("rows"):
coerced = self._coerce_block_dict(payload)
if coerced:
collected.append(coerced)
appended = True
items = payload.get("items")
if isinstance(items, list) and not block_type:
for item in items:
collected.extend(self._collect_blocks_from_payload(item))
return collected
if appended:
return collected
elif isinstance(payload, list):
for item in payload:
collected.extend(self._collect_blocks_from_payload(item))
elif payload is None:
return collected
return collected
def _coerce_block_dict(self, payload: Any) -> Dict[str, Any] | None:
"""尝试将dict补充为合法block结构"""
if not isinstance(payload, dict):
return None
block = copy.deepcopy(payload)
block_type = block.get("type")
if not block_type:
if "widgetId" in block:
block_type = block["type"] = "widget"
elif "rows" in block or "cells" in block:
block_type = block["type"] = "table"
if "rows" not in block and isinstance(block.get("cells"), list):
block["rows"] = [{"cells": block.pop("cells")}]
elif "items" in block:
block_type = block["type"] = "list"
return block if block.get("type") else None
def _format_toc_entry(self, entry: Dict[str, Any]) -> str:
"""
将单个目录项转为带描述的HTML行。
参数:
entry: 目录条目,需包含 `text` 与 `anchor`。
返回:
str: `
` 形式的HTML。
"""
desc = entry.get("description")
desc_html = f'
{self._escape_html(desc)}
' if desc else ""
level = entry.get("level", 2)
css_level = 1 if level <= 2 else min(level, 4)
return f'
'
def _normalize_table_rows(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
检测并修正仅有单列的竖排表,转换为标准网格。
参数:
rows: 原始表格行。
返回:
list[dict]: 若检测到竖排表则返回转置后的行,否则原样返回。
"""
if not rows:
return []
if not all(len((row.get("cells") or [])) == 1 for row in rows):
return rows
texts = [self._extract_row_text(row) for row in rows]
header_span = self._detect_transposed_header_span(rows, texts)
if not header_span:
return rows
normalized = self._transpose_single_cell_table(rows, header_span)
return normalized or rows
def _detect_transposed_header_span(self, rows: List[Dict[str, Any]], texts: List[str]) -> int:
"""推断竖排表头的行数,用于后续转置"""
max_fields = min(8, len(rows) // 2)
header_span = 0
for idx, text in enumerate(texts):
if idx >= max_fields:
break
if self._is_potential_table_header(text):
header_span += 1
else:
break
if header_span < 2:
return 0
remainder = texts[header_span:]
if not remainder or (len(rows) - header_span) % header_span != 0:
return 0
if not any(self._looks_like_table_value(txt) for txt in remainder):
return 0
return header_span
def _is_potential_table_header(self, text: str) -> bool:
"""根据长度与字符特征判断是否像表头字段"""
if not text:
return False
stripped = text.strip()
if not stripped or len(stripped) > 12:
return False
return not any(ch.isdigit() or ch in self.TABLE_COMPLEX_CHARS for ch in stripped)
def _looks_like_table_value(self, text: str) -> bool:
"""判断该文本是否更像数据值,用于辅助判断转置"""
if not text:
return False
stripped = text.strip()
if len(stripped) >= 12:
return True
return any(ch.isdigit() or ch in self.TABLE_COMPLEX_CHARS for ch in stripped)
def _transpose_single_cell_table(self, rows: List[Dict[str, Any]], span: int) -> List[Dict[str, Any]]:
"""将单列多行的表格转换为标准表头 + 若干数据行"""
total = len(rows)
if total <= span or (total - span) % span != 0:
return []
header_rows = rows[:span]
data_rows = rows[span:]
normalized: List[Dict[str, Any]] = []
header_cells = []
for row in header_rows:
cell = copy.deepcopy((row.get("cells") or [{}])[0])
cell["header"] = True
header_cells.append(cell)
normalized.append({"cells": header_cells})
for start in range(0, len(data_rows), span):
group = data_rows[start : start + span]
if len(group) < span:
break
normalized.append(
{
"cells": [
copy.deepcopy((item.get("cells") or [{}])[0])
for item in group
]
}
)
return normalized
def _extract_row_text(self, row: Dict[str, Any]) -> str:
"""提取表格行中的纯文本,方便启发式分析"""
cells = row.get("cells") or []
if not cells:
return ""
cell = cells[0]
texts: List[str] = []
for block in cell.get("blocks", []):
if isinstance(block, dict):
if block.get("type") == "paragraph":
for inline in block.get("inlines") or []:
if isinstance(inline, dict):
value = inline.get("text")
else:
value = inline
if value is None:
continue
texts.append(str(value))
return "".join(texts)
def _render_blockquote(self, block: Dict[str, Any]) -> str:
"""渲染引用块,可嵌套其他block"""
inner = self._render_blocks(block.get("blocks", []))
return f"
{inner}
"
def _render_code(self, block: Dict[str, Any]) -> str:
"""渲染代码块,附带语言信息"""
lang = block.get("lang") or ""
content = self._escape_html(block.get("content", ""))
return f'
'
def _merge_dicts(
self, base: Dict[str, Any] | None, override: Dict[str, Any] | None
) -> Dict[str, Any]:
"""
递归合并两个字典,override覆盖base,均为新副本,避免副作用。
"""
result = copy.deepcopy(base) if isinstance(base, dict) else {}
if not isinstance(override, dict):
return result
for key, value in override.items():
if isinstance(value, dict) and isinstance(result.get(key), dict):
result[key] = self._merge_dicts(result[key], value)
else:
result[key] = copy.deepcopy(value)
return result
def _looks_like_chart_dataset(self, candidate: Any) -> bool:
"""启发式判断对象是否包含Chart.js常见的labels/datasets结构"""
if not isinstance(candidate, dict):
return False
labels = candidate.get("labels")
datasets = candidate.get("datasets")
return isinstance(labels, list) or isinstance(datasets, list)
def _coerce_chart_data_structure(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
兼容LLM输出的Chart.js完整配置(含type/data/options)。
若data中嵌套一个真正的labels/datasets结构,则提取并返回该结构。
"""
if not isinstance(data, dict):
return {}
if self._looks_like_chart_dataset(data):
return data
for key in ("data", "chartData", "payload"):
nested = data.get(key)
if self._looks_like_chart_dataset(nested):
return copy.deepcopy(nested)
return data
def _prepare_widget_payload(
self, block: Dict[str, Any]
) -> tuple[Dict[str, Any], Dict[str, Any]]:
"""
预处理widget数据,兼容部分block将Chart.js配置写入data字段的情况。
返回:
tuple(props, data): 归一化后的props与chart数据
"""
props = copy.deepcopy(block.get("props") or {})
raw_data = block.get("data")
data_copy = copy.deepcopy(raw_data) if isinstance(raw_data, dict) else raw_data
widget_type = block.get("widgetType") or ""
chart_like = isinstance(widget_type, str) and widget_type.startswith("chart.js")
if chart_like and isinstance(data_copy, dict):
inline_options = data_copy.pop("options", None)
inline_type = data_copy.pop("type", None)
normalized_data = self._coerce_chart_data_structure(data_copy)
if isinstance(inline_options, dict):
props["options"] = self._merge_dicts(props.get("options"), inline_options)
if isinstance(inline_type, str) and inline_type and not props.get("type"):
props["type"] = inline_type
elif isinstance(data_copy, dict):
normalized_data = data_copy
else:
normalized_data = {}
return props, normalized_data
def _render_widget(self, block: Dict[str, Any]) -> str:
"""
渲染Chart.js等交互组件的占位容器,并记录配置JSON。
参数:
block: widget类型的block,包含widgetId/props/data。
返回:
str: 含canvas与配置脚本的HTML。
"""
self.chart_counter += 1
canvas_id = f"chart-{self.chart_counter}"
config_id = f"chart-config-{self.chart_counter}"
props, normalized_data = self._prepare_widget_payload(block)
payload = {
"widgetId": block.get("widgetId"),
"widgetType": block.get("widgetType"),
"props": props,
"data": normalized_data,
"dataRef": block.get("dataRef"),
}
config_json = json.dumps(payload, ensure_ascii=False).replace("", "<\\/")
self.widget_scripts.append(
f''
)
title = props.get("title")
title_html = f'
{self._escape_html(title)}
' if title else ""
fallback_html = self._render_widget_fallback(normalized_data)
return f"""
{title_html}
{fallback_html}
"""
def _render_widget_fallback(self, data: Dict[str, Any]) -> str:
"""渲染图表数据的文本兜底视图,避免Chart.js加载失败时出现空白"""
if not isinstance(data, dict):
return ""
labels = data.get("labels") or []
datasets = data.get("datasets") or []
if not labels or not datasets:
return ""
header_cells = "".join(
f"
{self._escape_html(ds.get('label') or f'系列{idx + 1}')}
"
for idx, ds in enumerate(datasets)
)
body_rows = ""
for idx, label in enumerate(labels):
row_cells = [f"
{self._escape_html(label)}
"]
for ds in datasets:
series = ds.get("data") or []
value = series[idx] if idx < len(series) else ""
row_cells.append(f"