""".strip()
def _render_tagline(self) -> str:
"""
渲染标题下方的标语,如无标语则返回空字符串。
返回:
str: tagline HTML或空串。
"""
tagline = self.metadata.get("tagline")
if not tagline:
return ""
return f'
{self._escape_html(tagline)}
'
def _render_cover(self) -> str:
"""
文章开头的封面区,居中展示标题与“文章总览”提示。
返回:
str: cover section HTML。
"""
title = self.metadata.get("title") or "智能舆情报告"
subtitle = self.metadata.get("subtitle") or self.metadata.get("templateName") or ""
overview_hint = "文章总览"
return f"""
{overview_hint}
{self._escape_html(title)}
{self._escape_html(subtitle)}
""".strip()
def _render_hero(self) -> str:
"""
根据layout中的hero字段输出摘要/KPI/亮点区。
新版本:将标题和总览合并在一起,去掉椭圆背景。
返回:
str: hero区HTML,若无数据则为空字符串。
"""
hero = self.metadata.get("hero") or {}
if not hero:
return ""
# 获取标题和副标题
title = self.metadata.get("title") or "智能舆情报告"
subtitle = self.metadata.get("subtitle") or self.metadata.get("templateName") or ""
summary = hero.get("summary")
summary_html = f'
{self._escape_html(summary)}
' if summary else ""
highlights = hero.get("highlights") or []
highlight_html = "".join(
f'
{self._escape_html(text)}
'
for text in highlights
)
actions = hero.get("actions") or []
actions_html = "".join(
f''
for text in actions
)
kpi_cards = ""
for item in hero.get("kpis", []):
delta = item.get("delta")
tone = item.get("tone") or "neutral"
delta_html = f'{self._escape_html(delta)}' if delta else ""
kpi_cards += f"""
{self._escape_html(item.get("label"))}
{self._escape_html(item.get("value"))}
{delta_html}
"""
return f"""
文章总览
{self._escape_html(title)}
{self._escape_html(subtitle)}
{summary_html}
{highlight_html}
{actions_html}
{kpi_cards}
""".strip()
def _render_meta_panel(self) -> str:
"""当前需求不展示元信息,保留方法便于后续扩展"""
return ""
def _render_toc_section(self) -> str:
"""
生成目录模块,如无目录数据则返回空字符串。
返回:
str: toc HTML结构。
"""
if not self.toc_entries:
return ""
if self.toc_rendered:
return ""
toc_config = self.metadata.get("toc") or {}
toc_title = toc_config.get("title") or "📚 目录"
toc_items = "".join(
self._format_toc_entry(entry)
for entry in self.toc_entries
)
self.toc_rendered = True
return f"""
""".strip()
def _collect_toc_entries(self, chapters: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
根据metadata中的tocPlan或章节heading收集目录项。
参数:
chapters: Document IR中的章节数组。
返回:
list[dict]: 规范化后的目录条目,包含level/text/anchor/description。
"""
metadata = self.metadata
toc_config = metadata.get("toc") or {}
custom_entries = toc_config.get("customEntries")
entries: List[Dict[str, Any]] = []
if custom_entries:
for entry in custom_entries:
anchor = entry.get("anchor") or self.chapter_anchor_map.get(entry.get("chapterId"))
# 验证anchor是否有效
if not anchor:
logger.warning(
f"目录项 '{entry.get('display') or entry.get('title')}' "
f"缺少有效的anchor,已跳过"
)
continue
# 验证anchor是否在chapter_anchor_map中或在chapters的blocks中
anchor_valid = self._validate_toc_anchor(anchor, chapters)
if not anchor_valid:
logger.warning(
f"目录项 '{entry.get('display') or entry.get('title')}' "
f"的anchor '{anchor}' 在文档中未找到对应的章节"
)
# 清理描述文本
description = entry.get("description")
if description:
description = self._clean_text_from_json_artifacts(description)
entries.append(
{
"level": entry.get("level", 2),
"text": entry.get("display") or entry.get("title") or "",
"anchor": anchor,
"description": description,
}
)
return entries
for chapter in chapters or []:
for block in chapter.get("blocks", []):
if block.get("type") == "heading":
anchor = block.get("anchor") or chapter.get("anchor") or ""
if not anchor:
continue
mapped = self.heading_label_map.get(anchor, {})
# 清理描述文本
description = mapped.get("description")
if description:
description = self._clean_text_from_json_artifacts(description)
entries.append(
{
"level": block.get("level", 2),
"text": mapped.get("display") or block.get("text", ""),
"anchor": anchor,
"description": description,
}
)
return entries
def _validate_toc_anchor(self, anchor: str, chapters: List[Dict[str, Any]]) -> bool:
"""
验证目录anchor是否在文档中存在对应的章节或heading。
参数:
anchor: 需要验证的anchor
chapters: Document IR中的章节数组
返回:
bool: anchor是否有效
"""
# 检查是否是章节anchor
if anchor in self.chapter_anchor_map.values():
return True
# 检查是否在heading_label_map中
if anchor in self.heading_label_map:
return True
# 检查章节的blocks中是否有这个anchor
for chapter in chapters or []:
chapter_anchor = chapter.get("anchor")
if chapter_anchor == anchor:
return True
for block in chapter.get("blocks", []):
block_anchor = block.get("anchor")
if block_anchor == anchor:
return True
return False
def _prepare_chapters(self, chapters: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""复制章节并展开其中序列化的block,避免渲染缺失"""
prepared: List[Dict[str, Any]] = []
for chapter in chapters or []:
chapter_copy = copy.deepcopy(chapter)
chapter_copy["blocks"] = self._expand_blocks_in_place(chapter_copy.get("blocks", []))
prepared.append(chapter_copy)
return prepared
def _expand_blocks_in_place(self, blocks: List[Dict[str, Any]] | None) -> List[Dict[str, Any]]:
"""遍历block列表,将内嵌JSON串拆解为独立block"""
expanded: List[Dict[str, Any]] = []
for block in blocks or []:
extras = self._extract_embedded_blocks(block)
expanded.append(block)
if extras:
expanded.extend(self._expand_blocks_in_place(extras))
return expanded
def _extract_embedded_blocks(self, block: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
在block内部查找被误写成字符串的block列表,并返回补充的block
"""
extracted: List[Dict[str, Any]] = []
def traverse(node: Any) -> None:
"""递归遍历block树,识别text字段内潜在的嵌套block JSON"""
if isinstance(node, dict):
for key, value in list(node.items()):
if key == "text" and isinstance(value, str):
decoded = self._decode_embedded_block_payload(value)
if decoded:
node[key] = ""
extracted.extend(decoded)
continue
traverse(value)
elif isinstance(node, list):
for item in node:
traverse(item)
traverse(block)
return extracted
def _decode_embedded_block_payload(self, raw: str) -> List[Dict[str, Any]] | None:
"""
将字符串形式的block描述恢复为结构化列表。
"""
if not isinstance(raw, str):
return None
stripped = raw.strip()
if not stripped or stripped[0] not in "{[":
return None
payload: Any | None = None
decode_targets = [stripped]
if stripped and stripped[0] != "[":
decode_targets.append(f"[{stripped}]")
for candidate in decode_targets:
try:
payload = json.loads(candidate)
break
except json.JSONDecodeError:
continue
if payload is None:
for candidate in decode_targets:
try:
payload = ast.literal_eval(candidate)
break
except (ValueError, SyntaxError):
continue
if payload is None:
return None
blocks = self._collect_blocks_from_payload(payload)
return blocks or None
@staticmethod
def _looks_like_block(payload: Dict[str, Any]) -> bool:
"""粗略判断dict是否符合block结构"""
if not isinstance(payload, dict):
return False
if "type" in payload and isinstance(payload["type"], str):
return True
structural_keys = {"blocks", "rows", "items", "widgetId", "widgetType", "data"}
return any(key in payload for key in structural_keys)
def _collect_blocks_from_payload(self, payload: Any) -> List[Dict[str, Any]]:
"""递归收集payload中的block节点"""
collected: List[Dict[str, Any]] = []
if isinstance(payload, dict):
block_list = payload.get("blocks")
block_type = payload.get("type")
if isinstance(block_list, list) and not block_type:
for candidate in block_list:
collected.extend(self._collect_blocks_from_payload(candidate))
return collected
if payload.get("cells") and not block_type:
for cell in payload["cells"]:
collected.extend(self._collect_blocks_from_payload(cell.get("blocks")))
return collected
if payload.get("items") and not block_type:
for item in payload["items"]:
collected.extend(self._collect_blocks_from_payload(item))
return collected
appended = False
if block_type or payload.get("widgetId") or payload.get("rows"):
coerced = self._coerce_block_dict(payload)
if coerced:
collected.append(coerced)
appended = True
items = payload.get("items")
if isinstance(items, list) and not block_type:
for item in items:
collected.extend(self._collect_blocks_from_payload(item))
return collected
if appended:
return collected
elif isinstance(payload, list):
for item in payload:
collected.extend(self._collect_blocks_from_payload(item))
elif payload is None:
return collected
return collected
def _coerce_block_dict(self, payload: Any) -> Dict[str, Any] | None:
"""尝试将dict补充为合法block结构"""
if not isinstance(payload, dict):
return None
block = copy.deepcopy(payload)
block_type = block.get("type")
if not block_type:
if "widgetId" in block:
block_type = block["type"] = "widget"
elif "rows" in block or "cells" in block:
block_type = block["type"] = "table"
if "rows" not in block and isinstance(block.get("cells"), list):
block["rows"] = [{"cells": block.pop("cells")}]
elif "items" in block:
block_type = block["type"] = "list"
return block if block.get("type") else None
def _format_toc_entry(self, entry: Dict[str, Any]) -> str:
"""
将单个目录项转为带描述的HTML行。
参数:
entry: 目录条目,需包含 `text` 与 `anchor`。
返回:
str: `
` 形式的HTML。
"""
desc = entry.get("description")
# 清理描述文本中的JSON片段
if desc:
desc = self._clean_text_from_json_artifacts(desc)
desc_html = f'
{self._escape_html(desc)}
' if desc else ""
level = entry.get("level", 2)
css_level = 1 if level <= 2 else min(level, 4)
return f'
'
def _normalize_table_rows(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
检测并修正仅有单列的竖排表,转换为标准网格。
参数:
rows: 原始表格行。
返回:
list[dict]: 若检测到竖排表则返回转置后的行,否则原样返回。
"""
if not rows:
return []
if not all(len((row.get("cells") or [])) == 1 for row in rows):
return rows
texts = [self._extract_row_text(row) for row in rows]
header_span = self._detect_transposed_header_span(rows, texts)
if not header_span:
return rows
normalized = self._transpose_single_cell_table(rows, header_span)
return normalized or rows
def _detect_transposed_header_span(self, rows: List[Dict[str, Any]], texts: List[str]) -> int:
"""推断竖排表头的行数,用于后续转置"""
max_fields = min(8, len(rows) // 2)
header_span = 0
for idx, text in enumerate(texts):
if idx >= max_fields:
break
if self._is_potential_table_header(text):
header_span += 1
else:
break
if header_span < 2:
return 0
remainder = texts[header_span:]
if not remainder or (len(rows) - header_span) % header_span != 0:
return 0
if not any(self._looks_like_table_value(txt) for txt in remainder):
return 0
return header_span
def _is_potential_table_header(self, text: str) -> bool:
"""根据长度与字符特征判断是否像表头字段"""
if not text:
return False
stripped = text.strip()
if not stripped or len(stripped) > 12:
return False
return not any(ch.isdigit() or ch in self.TABLE_COMPLEX_CHARS for ch in stripped)
def _looks_like_table_value(self, text: str) -> bool:
"""判断该文本是否更像数据值,用于辅助判断转置"""
if not text:
return False
stripped = text.strip()
if len(stripped) >= 12:
return True
return any(ch.isdigit() or ch in self.TABLE_COMPLEX_CHARS for ch in stripped)
def _transpose_single_cell_table(self, rows: List[Dict[str, Any]], span: int) -> List[Dict[str, Any]]:
"""将单列多行的表格转换为标准表头 + 若干数据行"""
total = len(rows)
if total <= span or (total - span) % span != 0:
return []
header_rows = rows[:span]
data_rows = rows[span:]
normalized: List[Dict[str, Any]] = []
header_cells = []
for row in header_rows:
cell = copy.deepcopy((row.get("cells") or [{}])[0])
cell["header"] = True
header_cells.append(cell)
normalized.append({"cells": header_cells})
for start in range(0, len(data_rows), span):
group = data_rows[start : start + span]
if len(group) < span:
break
normalized.append(
{
"cells": [
copy.deepcopy((item.get("cells") or [{}])[0])
for item in group
]
}
)
return normalized
def _extract_row_text(self, row: Dict[str, Any]) -> str:
"""提取表格行中的纯文本,方便启发式分析"""
cells = row.get("cells") or []
if not cells:
return ""
cell = cells[0]
texts: List[str] = []
for block in cell.get("blocks", []):
if isinstance(block, dict):
if block.get("type") == "paragraph":
for inline in block.get("inlines") or []:
if isinstance(inline, dict):
value = inline.get("text")
else:
value = inline
if value is None:
continue
texts.append(str(value))
return "".join(texts)
def _render_blockquote(self, block: Dict[str, Any]) -> str:
"""渲染引用块,可嵌套其他block"""
inner = self._render_blocks(block.get("blocks", []))
return f"
{inner}
"
def _render_code(self, block: Dict[str, Any]) -> str:
"""渲染代码块,附带语言信息"""
lang = block.get("lang") or ""
content = self._escape_html(block.get("content", ""))
return f'
'
def _merge_dicts(
self, base: Dict[str, Any] | None, override: Dict[str, Any] | None
) -> Dict[str, Any]:
"""
递归合并两个字典,override覆盖base,均为新副本,避免副作用。
"""
result = copy.deepcopy(base) if isinstance(base, dict) else {}
if not isinstance(override, dict):
return result
for key, value in override.items():
if isinstance(value, dict) and isinstance(result.get(key), dict):
result[key] = self._merge_dicts(result[key], value)
else:
result[key] = copy.deepcopy(value)
return result
def _looks_like_chart_dataset(self, candidate: Any) -> bool:
"""启发式判断对象是否包含Chart.js常见的labels/datasets结构"""
if not isinstance(candidate, dict):
return False
labels = candidate.get("labels")
datasets = candidate.get("datasets")
return isinstance(labels, list) or isinstance(datasets, list)
def _coerce_chart_data_structure(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
兼容LLM输出的Chart.js完整配置(含type/data/options)。
若data中嵌套一个真正的labels/datasets结构,则提取并返回该结构。
"""
if not isinstance(data, dict):
return {}
if self._looks_like_chart_dataset(data):
return data
for key in ("data", "chartData", "payload"):
nested = data.get(key)
if self._looks_like_chart_dataset(nested):
return copy.deepcopy(nested)
return data
def _prepare_widget_payload(
self, block: Dict[str, Any]
) -> tuple[Dict[str, Any], Dict[str, Any]]:
"""
预处理widget数据,兼容部分block将Chart.js配置写入data字段的情况。
返回:
tuple(props, data): 归一化后的props与chart数据
"""
props = copy.deepcopy(block.get("props") or {})
raw_data = block.get("data")
data_copy = copy.deepcopy(raw_data) if isinstance(raw_data, dict) else raw_data
widget_type = block.get("widgetType") or ""
chart_like = isinstance(widget_type, str) and widget_type.startswith("chart.js")
if chart_like and isinstance(data_copy, dict):
inline_options = data_copy.pop("options", None)
inline_type = data_copy.pop("type", None)
normalized_data = self._coerce_chart_data_structure(data_copy)
if isinstance(inline_options, dict):
props["options"] = self._merge_dicts(props.get("options"), inline_options)
if isinstance(inline_type, str) and inline_type and not props.get("type"):
props["type"] = inline_type
elif isinstance(data_copy, dict):
normalized_data = data_copy
else:
normalized_data = {}
return props, normalized_data
@staticmethod
def _is_chart_data_empty(data: Dict[str, Any] | None) -> bool:
"""检查图表数据是否为空或缺少有效datasets"""
if not isinstance(data, dict):
return True
datasets = data.get("datasets")
if not isinstance(datasets, list) or len(datasets) == 0:
return True
for ds in datasets:
if not isinstance(ds, dict):
continue
series = ds.get("data")
if isinstance(series, list) and len(series) > 0:
return False
return True
def _chart_cache_key(self, block: Dict[str, Any]) -> str:
"""使用修复器的缓存算法生成稳定的key,便于跨阶段共享结果"""
if hasattr(self, "chart_repairer") and block:
try:
return self.chart_repairer.build_cache_key(block)
except Exception:
pass
return str(id(block))
def _note_chart_failure(self, cache_key: str, reason: str) -> None:
"""记录修复失败原因,后续渲染直接使用占位提示"""
if not cache_key:
return
if not reason:
reason = "LLM返回的图表信息格式有误,无法正常显示"
self._chart_failure_notes[cache_key] = reason
def _record_chart_failure_stat(self, cache_key: str | None = None) -> None:
"""确保失败计数只统计一次"""
if cache_key and cache_key in self._chart_failure_recorded:
return
self.chart_validation_stats['failed'] += 1
if cache_key:
self._chart_failure_recorded.add(cache_key)
def _format_chart_error_reason(
self,
validation_result: ValidationResult | None = None,
fallback_reason: str | None = None
) -> str:
"""拼接友好的失败提示"""
base = "LLM返回的图表信息格式有误,已尝试本地与多模型修复但仍无法正常显示。"
detail = None
if validation_result:
if validation_result.errors:
detail = validation_result.errors[0]
elif validation_result.warnings:
detail = validation_result.warnings[0]
if not detail and fallback_reason:
detail = fallback_reason
if detail:
text = f"{base} 提示:{detail}"
return text[:180] + ("..." if len(text) > 180 else "")
return base
def _render_chart_error_placeholder(
self,
title: str | None,
reason: str,
widget_id: str | None = None
) -> str:
"""输出图表失败时的简洁占位提示,避免破坏HTML/PDF布局"""
safe_title = self._escape_html(title or "图表未能展示")
safe_reason = self._escape_html(reason)
widget_attr = f' data-widget-id="{self._escape_attr(widget_id)}"' if widget_id else ""
return f"""
!
{safe_title}
{safe_reason}
"""
def _has_chart_failure(self, block: Dict[str, Any]) -> tuple[bool, str | None]:
"""检查是否已有修复失败记录"""
cache_key = self._chart_cache_key(block)
if block.get("_chart_renderable") is False:
return True, block.get("_chart_error_reason")
if cache_key in self._chart_failure_notes:
return True, self._chart_failure_notes.get(cache_key)
return False, None
def _normalize_chart_block(
self,
block: Dict[str, Any],
chapter_context: Dict[str, Any] | None = None,
) -> None:
"""
补全图表block中的缺失字段(如scales、datasets),提升容错性。
- 将错误挂在block顶层的scales合并进props.options。
- 当data缺失或datasets为空时,尝试使用章节级的data作为兜底。
"""
if not isinstance(block, dict):
return
if block.get("type") != "widget":
return
widget_type = block.get("widgetType", "")
if not (isinstance(widget_type, str) and widget_type.startswith("chart.js")):
return
# 确保props存在
props = block.get("props")
if not isinstance(props, dict):
block["props"] = {}
props = block["props"]
# 将顶层scales合并进options,避免配置丢失
scales = block.get("scales")
if isinstance(scales, dict):
options = props.get("options") if isinstance(props.get("options"), dict) else {}
props["options"] = self._merge_dicts(options, {"scales": scales})
# 确保data存在
data = block.get("data")
if not isinstance(data, dict):
data = {}
block["data"] = data
# 如果datasets为空,尝试使用章节级data填充
if chapter_context and self._is_chart_data_empty(data):
chapter_data = chapter_context.get("data") if isinstance(chapter_context, dict) else None
if isinstance(chapter_data, dict):
fallback_ds = chapter_data.get("datasets")
if isinstance(fallback_ds, list) and len(fallback_ds) > 0:
merged_data = copy.deepcopy(data)
merged_data["datasets"] = copy.deepcopy(fallback_ds)
if not merged_data.get("labels") and isinstance(chapter_data.get("labels"), list):
merged_data["labels"] = copy.deepcopy(chapter_data["labels"])
block["data"] = merged_data
# 若仍缺少labels且数据点包含x值,自动生成便于fallback和坐标刻度
data_ref = block.get("data")
if isinstance(data_ref, dict) and not data_ref.get("labels"):
datasets_ref = data_ref.get("datasets")
if isinstance(datasets_ref, list) and datasets_ref:
first_ds = datasets_ref[0]
ds_data = first_ds.get("data") if isinstance(first_ds, dict) else None
if isinstance(ds_data, list):
labels_from_data = []
for idx, point in enumerate(ds_data):
if isinstance(point, dict):
label_text = point.get("x") or point.get("label") or f"点{idx + 1}"
else:
label_text = f"点{idx + 1}"
labels_from_data.append(str(label_text))
if labels_from_data:
data_ref["labels"] = labels_from_data
def _render_widget(self, block: Dict[str, Any]) -> str:
"""
渲染Chart.js等交互组件的占位容器,并记录配置JSON。
在渲染前进行图表验证和修复:
1. 验证图表数据格式
2. 如果无效,尝试本地修复
3. 如果本地修复失败,尝试API修复
4. 如果所有修复都失败,输出提示占位并跳过再次修复
参数:
block: widget类型的block,包含widgetId/props/data。
返回:
str: 含canvas与配置脚本的HTML。
"""
# 先在block层面做一次容错补全(scales、章节级数据等)
self._normalize_chart_block(block, getattr(self, "_current_chapter", None))
# 统计
widget_type = block.get('widgetType', '')
is_chart = isinstance(widget_type, str) and widget_type.startswith('chart.js')
is_wordcloud = isinstance(widget_type, str) and 'wordcloud' in widget_type.lower()
widget_id = block.get('widgetId')
cache_key = self._chart_cache_key(block) if is_chart else ""
props_snapshot = block.get("props") if isinstance(block.get("props"), dict) else {}
display_title = props_snapshot.get("title") or block.get("title") or widget_id or "图表"
if is_chart:
self.chart_validation_stats['total'] += 1
# 如果此前已记录失败,直接使用占位提示,避免重复修复
has_failed, cached_reason = self._has_chart_failure(block)
if has_failed:
self._record_chart_failure_stat(cache_key)
reason = cached_reason or "LLM返回的图表信息格式有误,无法正常显示"
return self._render_chart_error_placeholder(display_title, reason, widget_id)
# 验证图表数据
validation_result = self.chart_validator.validate(block)
if not validation_result.is_valid:
logger.warning(
f"图表 {block.get('widgetId', 'unknown')} 验证失败: {validation_result.errors}"
)
# 尝试修复
repair_result = self.chart_repairer.repair(block, validation_result)
if repair_result.success and repair_result.repaired_block:
# 修复成功,使用修复后的数据
block = repair_result.repaired_block
logger.info(
f"图表 {block.get('widgetId', 'unknown')} 修复成功 "
f"(方法: {repair_result.method}): {repair_result.changes}"
)
# 更新统计
if repair_result.method == 'local':
self.chart_validation_stats['repaired_locally'] += 1
elif repair_result.method == 'api':
self.chart_validation_stats['repaired_api'] += 1
else:
# 修复失败,记录失败并输出占位提示
fail_reason = self._format_chart_error_reason(validation_result)
block["_chart_renderable"] = False
block["_chart_error_reason"] = fail_reason
self._note_chart_failure(cache_key, fail_reason)
self._record_chart_failure_stat(cache_key)
logger.warning(
f"图表 {block.get('widgetId', 'unknown')} 修复失败,已跳过渲染: {fail_reason}"
)
return self._render_chart_error_placeholder(display_title, fail_reason, widget_id)
else:
# 验证通过
self.chart_validation_stats['valid'] += 1
if validation_result.warnings:
logger.info(
f"图表 {block.get('widgetId', 'unknown')} 验证通过,"
f"但有警告: {validation_result.warnings}"
)
# 渲染图表HTML
self.chart_counter += 1
canvas_id = f"chart-{self.chart_counter}"
config_id = f"chart-config-{self.chart_counter}"
props, normalized_data = self._prepare_widget_payload(block)
payload = {
"widgetId": block.get("widgetId"),
"widgetType": block.get("widgetType"),
"props": props,
"data": normalized_data,
"dataRef": block.get("dataRef"),
}
config_json = json.dumps(payload, ensure_ascii=False).replace("", "<\\/")
self.widget_scripts.append(
f''
)
title = props.get("title")
title_html = f'
{self._escape_html(title)}
' if title else ""
fallback_html = (
self._render_wordcloud_fallback(props, block.get("widgetId"))
if is_wordcloud
else self._render_widget_fallback(normalized_data, block.get("widgetId"))
)
return f"""
{title_html}
{fallback_html}
"""
def _render_widget_fallback(self, data: Dict[str, Any], widget_id: str | None = None) -> str:
"""渲染图表数据的文本兜底视图,避免Chart.js加载失败时出现空白"""
if not isinstance(data, dict):
return ""
labels = data.get("labels") or []
datasets = data.get("datasets") or []
if not labels or not datasets:
return ""
widget_attr = f' data-widget-id="{self._escape_attr(widget_id)}"' if widget_id else ""
header_cells = "".join(
f"
{self._escape_html(ds.get('label') or f'系列{idx + 1}')}
"
for idx, ds in enumerate(datasets)
)
body_rows = ""
for idx, label in enumerate(labels):
row_cells = [f"
{self._escape_html(label)}
"]
for ds in datasets:
series = ds.get("data") or []
value = series[idx] if idx < len(series) else ""
row_cells.append(f"
{self._escape_html(value)}
")
body_rows += f"
{''.join(row_cells)}
"
table_html = f"""
类别
{header_cells}
{body_rows}
"""
return table_html
def _render_wordcloud_fallback(self, props: Dict[str, Any] | None, widget_id: str | None = None) -> str:
"""为词云提供表格兜底,避免WordCloud渲染失败后页面空白"""
words = []
if isinstance(props, dict):
raw = props.get("data")
if isinstance(raw, list):
for item in raw:
if not isinstance(item, dict):
continue
text = item.get("word") or item.get("text") or item.get("label")
weight = item.get("weight")
category = item.get("category") or ""
if text:
words.append({"word": str(text), "weight": weight, "category": str(category)})
if not words:
return ""
def _format_weight(value: Any) -> str:
if isinstance(value, (int, float)) and not isinstance(value, bool):
if 0 <= value <= 1.5:
return f"{value * 100:.1f}%"
return f"{value:.2f}".rstrip("0").rstrip(".")
return str(value)
widget_attr = f' data-widget-id="{self._escape_attr(widget_id)}"' if widget_id else ""
rows = "".join(
f"