From 452640f8c51c5d2beb9084cdcc71a80ddfcf3e54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E4=B8=80=E4=B8=81?= <1769123563@qq.com> Date: Fri, 21 Nov 2025 05:51:51 +0800 Subject: [PATCH] Optimize the Method of Automatically Repairing Charts in PDF --- ReportEngine/renderers/chart_to_svg.py | 144 +++++++++++++++++++----- ReportEngine/renderers/html_renderer.py | 111 +++++++++++++++++- ReportEngine/renderers/pdf_renderer.py | 68 +++++++---- ReportEngine/utils/chart_validator.py | 37 +++++- 4 files changed, 303 insertions(+), 57 deletions(-) diff --git a/ReportEngine/renderers/chart_to_svg.py b/ReportEngine/renderers/chart_to_svg.py index 4963397..6dd4031 100644 --- a/ReportEngine/renderers/chart_to_svg.py +++ b/ReportEngine/renderers/chart_to_svg.py @@ -16,6 +16,7 @@ from __future__ import annotations import base64 import io import re +from datetime import datetime from typing import Any, Dict, List, Optional, Tuple from loguru import logger @@ -23,6 +24,7 @@ try: import matplotlib matplotlib.use('Agg') # 使用非GUI后端 import matplotlib.pyplot as plt + import matplotlib.dates as mdates import matplotlib.font_manager as fm from matplotlib.patches import Wedge, Rectangle import numpy as np @@ -70,6 +72,15 @@ class ChartToSVGConverter: 'var(--color-secondary)': '#95A5A6', # 浅灰色 } + # 支持解析 rgba(var(--color-primary-rgb), 0.5) 这类格式的兜底映射 + CSS_VAR_RGB_MAP = { + 'color-primary-rgb': (52, 152, 219), + 'color-tone-up-rgb': (80, 200, 120), + 'color-tone-down-rgb': (232, 93, 117), + 'color-accent-positive-rgb': (80, 200, 120), + 'color-accent-neutral-rgb': (149, 165, 166), + } + def __init__(self, font_path: Optional[str] = None): """ 初始化转换器 @@ -192,6 +203,25 @@ class ChartToSVGConverter: color = color.strip() + # 处理 rgba(var(--color-primary-rgb), 0.5) / rgb(var(--color-primary-rgb)) + var_rgba_pattern = r'rgba?\(var\(--([\w-]+)\)\s*(?:,\s*([\d.]+))?\)' + match = re.match(var_rgba_pattern, color) + if match: + var_name, alpha_str = match.groups() + rgb_tuple = self.CSS_VAR_RGB_MAP.get(var_name) + + # 兼容缺少 -rgb 后缀的写法 + if not rgb_tuple: + if var_name.endswith('-rgb'): + rgb_tuple = self.CSS_VAR_RGB_MAP.get(var_name[:-4]) + else: + rgb_tuple = self.CSS_VAR_RGB_MAP.get(f"{var_name}-rgb") + + if rgb_tuple: + r, g, b = rgb_tuple + alpha = float(alpha_str) if alpha_str is not None else 1.0 + return (r / 255, g / 255, b / 255, alpha) + # 【增强】处理CSS变量,例如 var(--color-accent) # 使用预定义的颜色映射表替代CSS变量,确保不同变量有不同的颜色 if color.startswith('var('): @@ -288,10 +318,17 @@ class ChartToSVGConverter: - 线条样式(tension曲线平滑) """ try: - labels = data.get('labels', []) - datasets = data.get('datasets', []) + labels = data.get('labels') or [] + datasets = data.get('datasets') or [] - if not labels or not datasets: + has_object_points = any( + isinstance(ds, dict) + and isinstance(ds.get('data'), list) + and any(isinstance(pt, dict) and ('x' in pt or 'y' in pt) for pt in ds.get('data')) + for ds in datasets + ) + + if (not datasets) or ((not labels) and not has_object_points): return None # 收集所有唯一的yAxisID @@ -312,6 +349,7 @@ class ChartToSVGConverter: title = props.get('title') options = props.get('options', {}) scales = options.get('scales', {}) + x_tick_labels = list(labels) if isinstance(labels, list) else [] # 创建图表和多个y轴 fig, ax1 = plt.subplots(figsize=(width/dpi, height/dpi), dpi=dpi) @@ -376,41 +414,90 @@ class ChartToSVGConverter: # 选择对应的坐标轴 ax = axes.get(y_axis_id, ax1) - # 绘制折线 - x_data = range(len(labels)) + is_object_data = isinstance(dataset_data, list) and any( + isinstance(point, dict) and ('x' in point or 'y' in point) + for point in dataset_data + ) - # 根据tension值决定是否平滑 - if tension > 0 and SCIPY_AVAILABLE: - # 使用样条插值平滑曲线(需要scipy) - if len(dataset_data) >= 4: # 至少需要4个点才能平滑 + if is_object_data: + x_data = [] + y_data = [] + annotations = [] + + for idx, point in enumerate(dataset_data): + if not isinstance(point, dict): + continue + + label_text = str(point.get('x', f"点{idx + 1}")) + if len(x_tick_labels) < len(dataset_data): + x_tick_labels.append(label_text) + + x_data.append(len(x_data)) + + y_val = point.get('y', 0) try: - x_smooth = np.linspace(0, len(labels)-1, len(labels)*3) - spl = make_interp_spline(x_data, dataset_data, k=min(3, len(dataset_data)-1)) - y_smooth = spl(x_smooth) - line, = ax.plot(x_smooth, y_smooth, label=label, color=border_color, linewidth=2) + y_val = float(y_val) + except (TypeError, ValueError): + y_val = 0 + y_data.append(y_val) + annotations.append(point.get('event')) - # 如果需要填充(使用极低透明度避免遮挡) - if fill: - ax.fill_between(x_smooth, y_smooth, alpha=0.08, color=background_color) - except: - # 如果平滑失败,使用普通折线 + if not x_data: + continue + + line, = ax.plot(x_data, y_data, marker='o', label=label, + color=border_color, linewidth=2, markersize=6) + + if fill: + ax.fill_between(x_data, y_data, alpha=0.08, color=background_color) + + for pos, y_val, text in zip(x_data, y_data, annotations): + if text: + ax.annotate( + text, + (pos, y_val), + textcoords='offset points', + xytext=(0, 8), + ha='center', + fontsize=8, + rotation=20 + ) + else: + # 绘制折线 + x_data = range(len(labels)) + + # 根据tension值决定是否平滑 + if tension > 0 and SCIPY_AVAILABLE: + # 使用样条插值平滑曲线(需要scipy) + if len(dataset_data) >= 4: # 至少需要4个点才能平滑 + try: + x_smooth = np.linspace(0, len(labels)-1, len(labels)*3) + spl = make_interp_spline(x_data, dataset_data, k=min(3, len(dataset_data)-1)) + y_smooth = spl(x_smooth) + line, = ax.plot(x_smooth, y_smooth, label=label, color=border_color, linewidth=2) + + # 如果需要填充(使用极低透明度避免遮挡) + if fill: + ax.fill_between(x_smooth, y_smooth, alpha=0.08, color=background_color) + except: + # 如果平滑失败,使用普通折线 + line, = ax.plot(x_data, dataset_data, marker='o', label=label, + color=border_color, linewidth=2, markersize=6) + if fill: + ax.fill_between(x_data, dataset_data, alpha=0.08, color=background_color) + else: line, = ax.plot(x_data, dataset_data, marker='o', label=label, color=border_color, linewidth=2, markersize=6) if fill: ax.fill_between(x_data, dataset_data, alpha=0.08, color=background_color) else: + # 直线连接(tension=0或scipy不可用) line, = ax.plot(x_data, dataset_data, marker='o', label=label, color=border_color, linewidth=2, markersize=6) + + # 如果需要填充(使用极低透明度避免遮挡) if fill: ax.fill_between(x_data, dataset_data, alpha=0.08, color=background_color) - else: - # 直线连接(tension=0或scipy不可用) - line, = ax.plot(x_data, dataset_data, marker='o', label=label, - color=border_color, linewidth=2, markersize=6) - - # 如果需要填充(使用极低透明度避免遮挡) - if fill: - ax.fill_between(x_data, dataset_data, alpha=0.08, color=background_color) # 记录这条线属于哪个轴 axis_lines[y_axis_id].append(line) @@ -430,8 +517,9 @@ class ChartToSVGConverter: legend_labels.append(label) # 设置x轴标签 - ax1.set_xticks(range(len(labels))) - ax1.set_xticklabels(labels, rotation=45, ha='right') + if x_tick_labels: + ax1.set_xticks(range(len(x_tick_labels))) + ax1.set_xticklabels(x_tick_labels, rotation=45, ha='right') # 设置y轴标签和标题 for y_axis_id, ax in axes.items(): diff --git a/ReportEngine/renderers/html_renderer.py b/ReportEngine/renderers/html_renderer.py index e27d402..9e31a58 100644 --- a/ReportEngine/renderers/html_renderer.py +++ b/ReportEngine/renderers/html_renderer.py @@ -79,6 +79,7 @@ class HTMLRenderer: self.secondary_heading_index = 0 self.toc_rendered = False self.hero_kpi_signature: tuple | None = None + self._current_chapter: Dict[str, Any] | None = None self._lib_cache: Dict[str, str] = {} self._pdf_font_base64: str | None = None @@ -967,7 +968,12 @@ class HTMLRenderer: str: section包裹的HTML。 """ section_id = self._escape_attr(chapter.get("anchor") or f"chapter-{chapter.get('chapterId', 'x')}") - blocks_html = self._render_blocks(chapter.get("blocks", [])) + prev_chapter = self._current_chapter + self._current_chapter = chapter + try: + blocks_html = self._render_blocks(chapter.get("blocks", [])) + finally: + self._current_chapter = prev_chapter return f'
\n{blocks_html}\n
' def _render_blocks(self, blocks: List[Dict[str, Any]]) -> str: @@ -1406,6 +1412,98 @@ class HTMLRenderer: return props, normalized_data + @staticmethod + def _is_chart_data_empty(data: Dict[str, Any] | None) -> bool: + """检查图表数据是否为空或缺少有效datasets""" + if not isinstance(data, dict): + return True + + datasets = data.get("datasets") + if not isinstance(datasets, list) or len(datasets) == 0: + return True + + for ds in datasets: + if not isinstance(ds, dict): + continue + series = ds.get("data") + if isinstance(series, list) and len(series) > 0: + return False + + return True + + def _normalize_chart_block( + self, + block: Dict[str, Any], + chapter_context: Dict[str, Any] | None = None, + ) -> None: + """ + 补全图表block中的缺失字段(如scales、datasets),提升容错性。 + + - 将错误挂在block顶层的scales合并进props.options。 + - 当data缺失或datasets为空时,尝试使用章节级的data作为兜底。 + """ + + if not isinstance(block, dict): + return + + if block.get("type") != "widget": + return + + widget_type = block.get("widgetType", "") + if not (isinstance(widget_type, str) and widget_type.startswith("chart.js")): + return + + # 确保props存在 + props = block.get("props") + if not isinstance(props, dict): + block["props"] = {} + props = block["props"] + + # 将顶层scales合并进options,避免配置丢失 + scales = block.get("scales") + if isinstance(scales, dict): + options = props.get("options") if isinstance(props.get("options"), dict) else {} + props["options"] = self._merge_dicts(options, {"scales": scales}) + + # 确保data存在 + data = block.get("data") + if not isinstance(data, dict): + data = {} + block["data"] = data + + # 如果datasets为空,尝试使用章节级data填充 + if chapter_context and self._is_chart_data_empty(data): + chapter_data = chapter_context.get("data") if isinstance(chapter_context, dict) else None + if isinstance(chapter_data, dict): + fallback_ds = chapter_data.get("datasets") + if isinstance(fallback_ds, list) and len(fallback_ds) > 0: + merged_data = copy.deepcopy(data) + merged_data["datasets"] = copy.deepcopy(fallback_ds) + + if not merged_data.get("labels") and isinstance(chapter_data.get("labels"), list): + merged_data["labels"] = copy.deepcopy(chapter_data["labels"]) + + block["data"] = merged_data + + # 若仍缺少labels且数据点包含x值,自动生成便于fallback和坐标刻度 + data_ref = block.get("data") + if isinstance(data_ref, dict) and not data_ref.get("labels"): + datasets_ref = data_ref.get("datasets") + if isinstance(datasets_ref, list) and datasets_ref: + first_ds = datasets_ref[0] + ds_data = first_ds.get("data") if isinstance(first_ds, dict) else None + if isinstance(ds_data, list): + labels_from_data = [] + for idx, point in enumerate(ds_data): + if isinstance(point, dict): + label_text = point.get("x") or point.get("label") or f"点{idx + 1}" + else: + label_text = f"点{idx + 1}" + labels_from_data.append(str(label_text)) + + if labels_from_data: + data_ref["labels"] = labels_from_data + def _render_widget(self, block: Dict[str, Any]) -> str: """ 渲染Chart.js等交互组件的占位容器,并记录配置JSON。 @@ -1422,6 +1520,9 @@ class HTMLRenderer: 返回: str: 含canvas与配置脚本的HTML。 """ + # 先在block层面做一次容错补全(scales、章节级数据等) + self._normalize_chart_block(block, getattr(self, "_current_chapter", None)) + # 统计 widget_type = block.get('widgetType', '') is_chart = isinstance(widget_type, str) and widget_type.startswith('chart.js') @@ -1489,7 +1590,7 @@ class HTMLRenderer: title = props.get("title") title_html = f'
{self._escape_html(title)}
' if title else "" - fallback_html = self._render_widget_fallback(normalized_data) + fallback_html = self._render_widget_fallback(normalized_data, block.get("widgetId")) return f"""
{title_html} @@ -1500,7 +1601,7 @@ class HTMLRenderer:
""" - def _render_widget_fallback(self, data: Dict[str, Any]) -> str: + def _render_widget_fallback(self, data: Dict[str, Any], widget_id: str | None = None) -> str: """渲染图表数据的文本兜底视图,避免Chart.js加载失败时出现空白""" if not isinstance(data, dict): return "" @@ -1508,6 +1609,8 @@ class HTMLRenderer: datasets = data.get("datasets") or [] if not labels or not datasets: return "" + + widget_attr = f' data-widget-id="{self._escape_attr(widget_id)}"' if widget_id else "" header_cells = "".join( f"{self._escape_html(ds.get('label') or f'系列{idx + 1}')}" for idx, ds in enumerate(datasets) @@ -1521,7 +1624,7 @@ class HTMLRenderer: row_cells.append(f"{self._escape_html(value)}") body_rows += f"{''.join(row_cells)}" table_html = f""" -
+
{header_cells} diff --git a/ReportEngine/renderers/pdf_renderer.py b/ReportEngine/renderers/pdf_renderer.py index b53f0c1..b3b08d9 100644 --- a/ReportEngine/renderers/pdf_renderer.py +++ b/ReportEngine/renderers/pdf_renderer.py @@ -7,11 +7,22 @@ from __future__ import annotations import base64 import copy +import os +import sys from pathlib import Path from typing import Any, Dict from datetime import datetime from loguru import logger +# 在导入WeasyPrint之前,尝试补充常见的macOS Homebrew动态库路径, +# 避免因未设置DYLD_LIBRARY_PATH而找不到pango/cairo等依赖。 +if sys.platform == 'darwin': + brew_lib = Path('/opt/homebrew/lib') + if brew_lib.exists(): + current = os.environ.get('DYLD_LIBRARY_PATH', '') + if str(brew_lib) not in current.split(':'): + os.environ['DYLD_LIBRARY_PATH'] = f"{brew_lib}{':' + current if current else ''}" + try: from weasyprint import HTML, CSS from weasyprint.text.fonts import FontConfiguration @@ -128,7 +139,7 @@ class PDFRenderer: 'failed': 0 } - def repair_widgets_in_blocks(blocks: list) -> None: + def repair_widgets_in_blocks(blocks: list, chapter_context: Dict[str, Any] | None = None) -> None: """递归修复blocks中的所有widget""" for block in blocks: if not isinstance(block, dict): @@ -136,6 +147,12 @@ class PDFRenderer: # 处理widget类型 if block.get('type') == 'widget': + # 先用HTML渲染器的容错逻辑补全字段 + try: + self.html_renderer._normalize_chart_block(block, chapter_context) + except Exception as exc: # 防御性处理,避免单个图表阻断流程 + logger.debug(f"预处理图表 {block.get('widgetId')} 时出错: {exc}") + widget_type = block.get('widgetType', '') if widget_type.startswith('chart.js'): repair_stats['total'] += 1 @@ -164,32 +181,32 @@ class PDFRenderer: ) # 递归处理嵌套的blocks - nested_blocks = block.get('blocks') - if isinstance(nested_blocks, list): - repair_widgets_in_blocks(nested_blocks) + nested_blocks = block.get('blocks') + if isinstance(nested_blocks, list): + repair_widgets_in_blocks(nested_blocks, chapter_context) # 处理列表项 - if block.get('type') == 'list': - items = block.get('items', []) - for item in items: - if isinstance(item, list): - repair_widgets_in_blocks(item) + if block.get('type') == 'list': + items = block.get('items', []) + for item in items: + if isinstance(item, list): + repair_widgets_in_blocks(item, chapter_context) # 处理表格单元格 - if block.get('type') == 'table': - rows = block.get('rows', []) - for row in rows: - cells = row.get('cells', []) - for cell in cells: - cell_blocks = cell.get('blocks', []) - if isinstance(cell_blocks, list): - repair_widgets_in_blocks(cell_blocks) + if block.get('type') == 'table': + rows = block.get('rows', []) + for row in rows: + cells = row.get('cells', []) + for cell in cells: + cell_blocks = cell.get('blocks', []) + if isinstance(cell_blocks, list): + repair_widgets_in_blocks(cell_blocks, chapter_context) # 处理所有章节 chapters = ir_copy.get('chapters', []) for chapter in chapters: blocks = chapter.get('blocks', []) - repair_widgets_in_blocks(blocks) + repair_widgets_in_blocks(blocks, chapter) # 输出统计信息 if repair_stats['total'] > 0: @@ -425,6 +442,17 @@ class PDFRenderer: # 【修复】替换canvas为SVG,使用lambda避免反斜杠转义问题 html = re.sub(canvas_pattern, lambda m: svg_html, html) logger.debug(f"已替换图表 {widget_id} 的canvas为SVG") + + # 将对应fallback标记为隐藏,避免PDF中出现重复表格 + fallback_pattern = rf'
]*data-widget-id="{re.escape(widget_id)}"[^>]*)>' + + def _hide_fallback(m: re.Match) -> str: + tag = m.group(0) + if 'svg-hidden' in tag: + return tag + return tag.replace('chart-fallback"', 'chart-fallback svg-hidden"', 1) + + html = re.sub(fallback_pattern, _hide_fallback, html, count=1) else: logger.warning(f"未找到图表 {widget_id} 对应的配置脚本") @@ -617,8 +645,8 @@ body {{ display: none !important; }} -/* 隐藏fallback表格(因为现在使用SVG) */ -.chart-fallback {{ +/* 当对应SVG成功注入时隐藏fallback表格,失败时继续显示兜底数据 */ +.chart-fallback.svg-hidden {{ display: none !important; }} diff --git a/ReportEngine/utils/chart_validator.py b/ReportEngine/utils/chart_validator.py index 53133e9..d17cf33 100644 --- a/ReportEngine/utils/chart_validator.py +++ b/ReportEngine/utils/chart_validator.py @@ -133,13 +133,28 @@ class ChartValidator: errors.append("data字段必须是字典类型") return ValidationResult(False, errors, warnings) + # 检测是否使用了{x, y}形式的数据点(通常用于时间轴/散点) + def contains_object_points(ds_list: List[Any] | None) -> bool: + if not isinstance(ds_list, list): + return False + for point in ds_list: + if isinstance(point, dict) and any(key in point for key in ('x', 'y', 't')): + return True + return False + + datasets_for_detection = data.get('datasets') or [] + uses_object_points = any( + isinstance(ds, dict) and contains_object_points(ds.get('data')) + for ds in datasets_for_detection + ) + # 6. 根据图表类型验证数据 if chart_type in self.SPECIAL_DATA_TYPES: # 特殊数据格式(scatter, bubble) self._validate_special_data(data, chart_type, errors, warnings) else: # 标准数据格式(labels + datasets) - self._validate_standard_data(data, chart_type, errors, warnings) + self._validate_standard_data(data, chart_type, errors, warnings, uses_object_points) # 7. 验证props props = widget_block.get('props') @@ -186,7 +201,8 @@ class ChartValidator: data: Dict[str, Any], chart_type: str, errors: List[str], - warnings: List[str] + warnings: List[str], + uses_object_points: bool = False ): """验证标准数据格式(labels + datasets)""" labels = data.get('labels') @@ -195,7 +211,12 @@ class ChartValidator: # 验证labels if chart_type in self.LABEL_REQUIRED_TYPES: if not labels: - errors.append(f"{chart_type}类型图表必须包含labels字段") + if uses_object_points: + warnings.append( + f"{chart_type}类型图表缺少labels,已根据数据点渲染(使用x值)" + ) + else: + errors.append(f"{chart_type}类型图表必须包含labels字段") elif not isinstance(labels, list): errors.append("labels必须是数组类型") elif len(labels) == 0: @@ -234,15 +255,21 @@ class ChartValidator: warnings.append(f"datasets[{idx}].data数组为空") continue + # 如果是{x, y}对象形式的数据点,默认允许跳过labels长度和数值校验 + object_points = any( + isinstance(value, dict) and any(key in value for key in ('x', 'y', 't')) + for value in ds_data + ) + # 验证数据长度一致性 - if labels and isinstance(labels, list): + if labels and isinstance(labels, list) and not object_points: if len(ds_data) != len(labels): warnings.append( f"datasets[{idx}].data长度({len(ds_data)})与labels长度({len(labels)})不匹配" ) # 验证数值类型 - if chart_type in self.NUMERIC_DATA_TYPES: + if chart_type in self.NUMERIC_DATA_TYPES and not object_points: for data_idx, value in enumerate(ds_data): if value is not None and not isinstance(value, (int, float)): errors.append(
类别