Added Support for Word Cloud Association When Generating PDFs
This commit is contained in:
@@ -9,6 +9,7 @@ import base64
|
||||
import copy
|
||||
import os
|
||||
import sys
|
||||
import io
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
from datetime import datetime
|
||||
@@ -69,6 +70,12 @@ from .html_renderer import HTMLRenderer
|
||||
from .pdf_layout_optimizer import PDFLayoutOptimizer, PDFLayoutConfig
|
||||
from .chart_to_svg import create_chart_converter
|
||||
from .math_to_svg import MathToSVG
|
||||
try:
|
||||
from wordcloud import WordCloud
|
||||
WORDCLOUD_AVAILABLE = True
|
||||
except ImportError:
|
||||
WORDCLOUD_AVAILABLE = False
|
||||
logger = logger # ensure logger exists even before declaration
|
||||
|
||||
|
||||
class PDFRenderer:
|
||||
@@ -272,6 +279,26 @@ class PDFRenderer:
|
||||
logger.info(f"成功转换 {len(svg_map)} 个图表为SVG")
|
||||
return svg_map
|
||||
|
||||
def _convert_wordclouds_to_images(self, document_ir: Dict[str, Any]) -> Dict[str, str]:
|
||||
"""
|
||||
将document_ir中的词云widget转换为PNG并返回data URI映射
|
||||
"""
|
||||
img_map: Dict[str, str] = {}
|
||||
|
||||
if not WORDCLOUD_AVAILABLE:
|
||||
logger.debug("wordcloud库未安装,词云将使用表格兜底")
|
||||
return img_map
|
||||
|
||||
# 遍历所有章节
|
||||
chapters = document_ir.get('chapters', [])
|
||||
for chapter in chapters:
|
||||
blocks = chapter.get('blocks', [])
|
||||
self._extract_wordcloud_widgets(blocks, img_map)
|
||||
|
||||
if img_map:
|
||||
logger.info(f"成功转换 {len(img_map)} 个词云为图片")
|
||||
return img_map
|
||||
|
||||
def _extract_and_convert_widgets(
|
||||
self,
|
||||
blocks: list,
|
||||
@@ -334,6 +361,109 @@ class PDFRenderer:
|
||||
if isinstance(cell_blocks, list):
|
||||
self._extract_and_convert_widgets(cell_blocks, svg_map)
|
||||
|
||||
def _extract_wordcloud_widgets(
|
||||
self,
|
||||
blocks: list,
|
||||
img_map: Dict[str, str]
|
||||
) -> None:
|
||||
"""
|
||||
递归遍历blocks,找到词云widget并生成图片
|
||||
"""
|
||||
for block in blocks:
|
||||
if not isinstance(block, dict):
|
||||
continue
|
||||
|
||||
block_type = block.get('type')
|
||||
if block_type == 'widget':
|
||||
widget_id = block.get('widgetId')
|
||||
widget_type = block.get('widgetType', '')
|
||||
|
||||
if widget_id and isinstance(widget_type, str) and 'wordcloud' in widget_type.lower():
|
||||
try:
|
||||
data_uri = self._generate_wordcloud_image(block)
|
||||
if data_uri:
|
||||
img_map[widget_id] = data_uri
|
||||
logger.debug(f"词云 {widget_id} 转换为图片成功")
|
||||
except Exception as exc:
|
||||
logger.warning(f"生成词云图片失败 {widget_id}: {exc}")
|
||||
|
||||
nested_blocks = block.get('blocks')
|
||||
if isinstance(nested_blocks, list):
|
||||
self._extract_wordcloud_widgets(nested_blocks, img_map)
|
||||
|
||||
if block_type == 'list':
|
||||
items = block.get('items', [])
|
||||
for item in items:
|
||||
if isinstance(item, list):
|
||||
self._extract_wordcloud_widgets(item, img_map)
|
||||
|
||||
if block_type == 'table':
|
||||
rows = block.get('rows', [])
|
||||
for row in rows:
|
||||
cells = row.get('cells', [])
|
||||
for cell in cells:
|
||||
cell_blocks = cell.get('blocks', [])
|
||||
if isinstance(cell_blocks, list):
|
||||
self._extract_wordcloud_widgets(cell_blocks, img_map)
|
||||
|
||||
def _normalize_wordcloud_items(self, block: Dict[str, Any]) -> list:
|
||||
"""
|
||||
从widget block中提取词云数据
|
||||
"""
|
||||
props = block.get('props') or {}
|
||||
raw_items = props.get('data')
|
||||
if not isinstance(raw_items, list):
|
||||
return []
|
||||
normalized = []
|
||||
for item in raw_items:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
word = item.get('word') or item.get('text') or item.get('label')
|
||||
if not word:
|
||||
continue
|
||||
weight = item.get('weight')
|
||||
try:
|
||||
weight_val = float(weight)
|
||||
if weight_val <= 0:
|
||||
weight_val = 1.0
|
||||
except (TypeError, ValueError):
|
||||
weight_val = 1.0
|
||||
category = (item.get('category') or '').lower()
|
||||
normalized.append({'word': str(word), 'weight': weight_val, 'category': category})
|
||||
return normalized
|
||||
|
||||
def _generate_wordcloud_image(self, block: Dict[str, Any]) -> str | None:
|
||||
"""
|
||||
生成词云PNG并返回data URI
|
||||
"""
|
||||
items = self._normalize_wordcloud_items(block)
|
||||
if not items:
|
||||
return None
|
||||
|
||||
# 使用频次形式馈入wordcloud库
|
||||
frequencies = {}
|
||||
for item in items:
|
||||
weight = item['weight']
|
||||
# 兼容权重为0-1的小数,放大以体现差异
|
||||
freq = weight * 100 if 0 < weight <= 1.5 else weight
|
||||
frequencies[item['word']] = max(1, freq)
|
||||
|
||||
font_path = str(self._get_font_path())
|
||||
wc = WordCloud(
|
||||
width=900,
|
||||
height=520,
|
||||
background_color="white",
|
||||
font_path=font_path,
|
||||
prefer_horizontal=0.9,
|
||||
random_state=42,
|
||||
)
|
||||
wc.generate_from_frequencies(frequencies)
|
||||
|
||||
buffer = io.BytesIO()
|
||||
wc.to_image().save(buffer, format='PNG')
|
||||
encoded = base64.b64encode(buffer.getvalue()).decode('ascii')
|
||||
return f"data:image/png;base64,{encoded}"
|
||||
|
||||
def _convert_math_to_svg(self, document_ir: Dict[str, Any]) -> Dict[str, str]:
|
||||
"""
|
||||
将document_ir中的所有数学公式转换为SVG
|
||||
@@ -486,6 +616,46 @@ class PDFRenderer:
|
||||
|
||||
return html
|
||||
|
||||
def _inject_wordcloud_images(self, html: str, img_map: Dict[str, str]) -> str:
|
||||
"""
|
||||
将词云PNG data URI注入HTML,替换对应canvas
|
||||
"""
|
||||
if not img_map:
|
||||
return html
|
||||
|
||||
import re
|
||||
|
||||
for widget_id, data_uri in img_map.items():
|
||||
img_html = (
|
||||
f'<div class="chart-svg-container wordcloud-img">'
|
||||
f'<img src="{data_uri}" alt="词云" />'
|
||||
f'</div>'
|
||||
)
|
||||
|
||||
config_pattern = rf'<script[^>]+id="([^"]+)"[^>]*>\s*\{{[^}}]*"widgetId"\s*:\s*"{re.escape(widget_id)}"[^}}]*\}}'
|
||||
match = re.search(config_pattern, html, re.DOTALL)
|
||||
if not match:
|
||||
logger.debug(f"未找到词云 {widget_id} 的配置脚本,跳过注入")
|
||||
continue
|
||||
|
||||
config_id = match.group(1)
|
||||
canvas_pattern = rf'<canvas[^>]+data-config-id="{re.escape(config_id)}"[^>]*></canvas>'
|
||||
|
||||
html = re.sub(canvas_pattern, lambda m: img_html, html)
|
||||
logger.debug(f"已替换词云 {widget_id} 的canvas为PNG图片")
|
||||
|
||||
fallback_pattern = rf'<div class="chart-fallback"([^>]*data-widget-id="{re.escape(widget_id)}"[^>]*)>'
|
||||
|
||||
def _hide_fallback(m: re.Match) -> str:
|
||||
tag = m.group(0)
|
||||
if 'svg-hidden' in tag:
|
||||
return tag
|
||||
return tag.replace('chart-fallback"', 'chart-fallback svg-hidden"', 1)
|
||||
|
||||
html = re.sub(fallback_pattern, _hide_fallback, html, count=1)
|
||||
|
||||
return html
|
||||
|
||||
def _inject_math_svg_into_html(self, html: str, svg_map: Dict[str, str]) -> str:
|
||||
"""
|
||||
将数学公式SVG内容注入到HTML中
|
||||
@@ -580,6 +750,10 @@ class PDFRenderer:
|
||||
logger.info("开始转换图表为SVG矢量图形...")
|
||||
svg_map = self._convert_charts_to_svg(preprocessed_ir)
|
||||
|
||||
# 转换词云为PNG
|
||||
logger.info("开始转换词云为图片...")
|
||||
wordcloud_map = self._convert_wordclouds_to_images(preprocessed_ir)
|
||||
|
||||
# 转换数学公式为SVG
|
||||
logger.info("开始转换数学公式为SVG矢量图形...")
|
||||
math_svg_map = self._convert_math_to_svg(preprocessed_ir)
|
||||
@@ -594,6 +768,10 @@ class PDFRenderer:
|
||||
html = self._inject_svg_into_html(html, svg_map)
|
||||
logger.info(f"已注入 {len(svg_map)} 个SVG图表")
|
||||
|
||||
if wordcloud_map:
|
||||
html = self._inject_wordcloud_images(html, wordcloud_map)
|
||||
logger.info(f"已注入 {len(wordcloud_map)} 个词云图片")
|
||||
|
||||
# 注入数学公式SVG
|
||||
if math_svg_map:
|
||||
html = self._inject_math_svg_into_html(html, math_svg_map)
|
||||
@@ -652,6 +830,10 @@ body {{
|
||||
max-width: 100%;
|
||||
height: auto;
|
||||
}}
|
||||
.chart-svg-container img {{
|
||||
max-width: 100%;
|
||||
height: auto;
|
||||
}}
|
||||
|
||||
/* 数学公式SVG容器样式 */
|
||||
.math-svg-container {{
|
||||
|
||||
Reference in New Issue
Block a user