"""JRXML 窗口化拆解与重组工具。 用于 3 阶段生成管道的 refine_layout 和 map_fields 节点: - 将大段 JRXML 按 band 拆解为独立窗口 - 每个窗口独立发送给 LLM 进行坐标精调 - 重组所有窗口 + 校验元素完整性 调用者: agent/nodes.py (refine_layout, map_fields) """ from __future__ import annotations import re from dataclasses import dataclass from typing import Optional import defusedxml.ElementTree as ET from backend.logger import get_logger _windower_log = get_logger("jrxml.windower") # 需要按 section 拆解的 band 容器标签 _SECTION_TAGS = { "title", "pageHeader", "columnHeader", "detail", "columnFooter", "pageFooter", "lastPageFooter", "summary", "noData", "background", } # 不发给 LLM 的 header 元素(原样保留) _HEADER_TAGS = { "property", "propertyExpression", "import", "template", "reportFont", "style", "subDataset", "scriptlet", "parameter", "queryString", "field", "sortField", "variable", "filterExpression", "group", } @dataclass class BandInfo: """单个 band 的拆解信息。""" section_name: str # 所属 section 名,如 "title", "detail" band_index: int # 在该 section 中的序号(0-based) band_xml: str # 完整 ... 原始 XML element_count: int # textField + staticText 数量 char_length: int # 字符数 @property def label(self) -> str: """用于日志和 prompt 的标识。""" if self.band_index > 0: return f"{self.section_name}_band{self.band_index}" return self.section_name @dataclass class JRXMLParts: """JRXML 拆解结果。""" declaration: str # (如有) root_open: str # header_xml: str # fields/params/queryString 等(不发给 LLM) bands: list[BandInfo] # 按出现顺序 footer: str # @property def band_count(self) -> int: return len(self.bands) @property def total_elements(self) -> int: return sum(b.element_count for b in self.bands) # ── 拆解 ────────────────────────────────────────────────────────── def decompose_jrxml(jrxml: str) -> Optional[JRXMLParts]: """将 JRXML 字符串拆解为 header + bands + footer 三部分。 使用 defusedxml.ElementTree 进行安全解析。 返回 None 表示解析失败。 """ try: root = ET.fromstring(jrxml) except ET.ParseError as e: _windower_log.error("JRXML 解析失败: %s", e) return None tag = _local_tag(root.tag) if tag != "jasperReport": _windower_log.error("根元素不是 jasperReport: %s", tag) return None # 提取 XML 声明 declaration = "" if jrxml.strip().startswith("") if decl_end != -1: declaration = jrxml[:decl_end + 2] # 提取根元素属性来重建 root_open root_open = _build_root_open(jrxml, root) # 分离 header 子元素和 section 子元素 header_children = [] section_children = [] # (section_tag, child_elem) for child in root: child_tag = _local_tag(child.tag) if child_tag in _HEADER_TAGS: header_children.append(child) elif child_tag in _SECTION_TAGS: section_children.append((child_tag, child)) # 构建 header_xml:序列化所有 header 子元素 header_parts = [] for child in header_children: header_parts.append(_elem_to_string(child)) header_xml = "\n".join(header_parts) # 提取 bands:每个 section 内可能有多个 bands = [] for sec_tag, sec_elem in section_children: for bi, band_elem in enumerate(sec_elem): band_local = _local_tag(band_elem.tag) if band_local != "band": continue band_xml = _elem_to_string(band_elem) ec = _count_elements_in_text(band_xml) bands.append(BandInfo( section_name=sec_tag, band_index=bi, band_xml=band_xml, element_count=ec, char_length=len(band_xml), )) # 提取 footer: 闭合标签 footer = _extract_footer(jrxml) parts = JRXMLParts( declaration=declaration, root_open=root_open, header_xml=header_xml, bands=bands, footer=footer, ) _windower_log.info( "JRXML 拆解完成: %d bands, %d 个元素, header %d 字符", len(bands), parts.total_elements, len(header_xml), ) return parts # ── 窗口切分 ────────────────────────────────────────────────────── # 安全的元素边界:在这些闭合标签后切分 _SAFE_SPLIT_CLOSING = re.compile( r"\s*" ) def split_band_into_windows(band: BandInfo, max_chars: int = 4000) -> list[str]: """将一个 band 的 XML 在元素边界处切分为多个窗口。 每个窗口是合法的 XML 片段(完整的 ...), 大小不超过 max_chars。 """ if band.char_length <= max_chars: return [band.band_xml] inner = _extract_band_inner(band.band_xml) if not inner: return [band.band_xml] segments = _split_at_boundaries(inner, _SAFE_SPLIT_CLOSING) if len(segments) <= 1: return [band.band_xml] windows = _greedy_aggregate(segments, band.band_xml, max_chars) return windows # ── 重组 ────────────────────────────────────────────────────────── def _recalc_band_height(band_xml: str, margin: int = 20) -> str: """根据波段内所有子元素的 y + height 重新计算波段 height。""" max_bottom = 0 for m in re.finditer(r']*)/>', band_xml): attrs = m.group(1) ym = re.search(r'\sy\s*=\s*"(\d+)"', attrs) hm = re.search(r'\sheight\s*=\s*"(\d+)"', attrs) if ym and hm: bottom = int(ym.group(1)) + int(hm.group(1)) if bottom > max_bottom: max_bottom = bottom if max_bottom == 0: return band_xml new_height = max_bottom + margin return re.sub( r'(]*\sheight\s*=\s*)"(\d+)"', rf'\g<1>"{new_height}"', band_xml, count=1, ) def reassemble_band_windows(modified_windows: list[str]) -> str: """将多个窗口的修改结果重新合并为一个 band XML。 策略:取第一个窗口的开头(band 标签)和最后一个窗口的结尾(/band 标签), 中间拼接所有窗口内部的元素内容。 """ if len(modified_windows) == 1: return _recalc_band_height(modified_windows[0]) first = modified_windows[0] band_open_end = first.find(">") if band_open_end == -1: return _recalc_band_height("\n".join(modified_windows)) band_open = first[:band_open_end + 1] last = modified_windows[-1] band_close = _extract_band_close(last) inner_parts = [] for win in modified_windows: inner = _extract_band_inner(win) if inner: inner_parts.append(inner) return _recalc_band_height(band_open + "\n" + "\n".join(inner_parts) + "\n" + band_close) def reassemble_jrxml(parts: JRXMLParts, modified_bands: dict[str, str]) -> str: """将修改后的 bands 与 header/footer 重新组装为完整 JRXML。 modified_bands 的 key 格式为 "{section_name}_band{index}" 或 "{section_name}"(index=0 时)。 """ result = [] if parts.declaration: result.append(parts.declaration) result.append(parts.root_open) if parts.header_xml.strip(): result.append(parts.header_xml) current_section = None for band in parts.bands: if band.section_name != current_section: if current_section is not None: result.append(f"") current_section = band.section_name result.append(f"<{current_section}>") modified = modified_bands.get(band.label, band.band_xml) result.append(modified) if current_section is not None: result.append(f"") result.append(parts.footer) return "\n".join(result) # ── 元素计数与校验 ──────────────────────────────────────────────── _ELEMENT_RE = re.compile(r"<(?:[\w:]+:)?(textField|staticText|field)\b", re.IGNORECASE) def count_elements(jrxml: str) -> int: """正则计数 JRXML 中的 textField + staticText + field 声明。""" return len(_ELEMENT_RE.findall(jrxml)) def validate_element_count(original: str, modified: str, stage: str) -> dict: """校验修改前后的元素数变化。 返回: {"ok": bool, "original": int, "modified": int, "change_pct": float} 变化 > 10% 时 ok=False,调用方应回退。 """ orig = count_elements(original) mod = count_elements(modified) if orig == 0: return {"ok": True, "original": 0, "modified": mod, "change_pct": 0} change = abs(mod - orig) / orig ok = change <= 0.10 if not ok: _windower_log.error( "%s 元素数变化过大: %d → %d (%.1f%%)", stage, orig, mod, change * 100, ) elif change > 0.05: _windower_log.warning( "%s 元素数有差异: %d → %d (%.1f%%)", stage, orig, mod, change * 100, ) return {"ok": ok, "original": orig, "modified": mod, "change_pct": round(change, 4)} # ── 内部工具函数 ────────────────────────────────────────────────── def _local_tag(tag: str) -> str: """去除 XML 命名空间前缀。""" return tag.split("}")[-1] if "}" in tag else tag def _elem_to_string(elem: ET.Element) -> str: """将 ElementTree 元素序列化为字符串(使用 defusedxml 的 tostring)。""" raw = ET.tostring(elem, encoding="unicode") return raw.strip() def _build_root_open(jrxml: str, root: ET.Element) -> str: """从原始文本重建 开头标签。""" m = re.search(r"]*>", jrxml, re.IGNORECASE) if m: return m.group(0) attrs = [] for k, v in root.attrib.items(): attrs.append(f'{k}="{v}"') return "" def _extract_footer(jrxml: str) -> str: """提取 闭合标签。""" m = re.search(r"\s*$", jrxml, re.IGNORECASE) if m: return m.group(0).rstrip() return "" _BAND_CLOSE_RE = re.compile(r"\s*$", re.IGNORECASE) def _extract_band_close(band_xml: str) -> str: """提取 band 的闭合标签(兼容命名空间前缀),如 '' 或 ''。""" m = _BAND_CLOSE_RE.search(band_xml) return m.group(0).rstrip() if m else "" def _extract_band_inner(band_xml: str) -> str: """提取 和 之间的内容(兼容命名空间前缀)。""" tag_end = band_xml.find(">") if tag_end == -1: return "" close_m = _BAND_CLOSE_RE.search(band_xml) if not close_m: return band_xml[tag_end + 1:].strip() return band_xml[tag_end + 1:close_m.start()].strip() def _split_at_boundaries(text: str, boundary_re: re.Pattern) -> list[str]: """在正则匹配的闭合标签处切分文本。 返回切分后的片段列表(分隔符附加到前一个片段末尾)。 """ segments = [] last_end = 0 for m in boundary_re.finditer(text): end = m.end() segments.append(text[last_end:end]) last_end = end if last_end < len(text): segments.append(text[last_end:]) elif not segments: segments.append(text) return segments def _greedy_aggregate(segments: list[str], band_xml: str, max_chars: int) -> list[str]: """贪心聚合:将片段组合成不超过 max_chars 的窗口。 每个窗口包上 标签。 """ tag_end = band_xml.find(">") band_open = band_xml[:tag_end + 1] if tag_end != -1 else "" band_close = _extract_band_close(band_xml) overhead = len(band_open) + len(band_close) + 1 # +1 for \n windows = [] current = [] current_len = overhead for seg in segments: seg_len = len(seg) if current and current_len + seg_len > max_chars: windows.append(band_open + "\n" + "".join(current) + "\n" + band_close) current = [seg] current_len = overhead + seg_len else: current.append(seg) current_len += seg_len if current: windows.append(band_open + "\n" + "".join(current) + "\n" + band_close) return windows def _count_elements_in_text(xml_text: str) -> int: """统计 XML 文本中的 textField + staticText 数量。""" return len(_ELEMENT_RE.findall(xml_text))