Files
rag_jrxml/jrxml_chunker.py
panda 4f475e9e36 feat: 添加Qwen3嵌入模型及JRXML报告相关文件
添加Qwen3-4B嵌入模型配置文件及权重文件
添加多个JRXML报告的数据查询和字段定义文件
添加PdfEncryptReport.jrxml示例报告文件
2026-05-11 08:34:03 +08:00

1176 lines
52 KiB
Python

"""
JRXML Semantic Chunking v3.0
Goal: Chunk JasperReports template files by domain semantics for LLM learning
Complete data source type support:
- SQL (JDBC database)
- HQL (Hibernate Query Language)
- XPath (XML data)
- JSON (JSON data)
- JSONQL (JSON Query Language)
- CSV (CSV data)
- Excel/XLSX via Data Adapter
- XML via Data Adapter
- HTTP Data Adapter (remote data)
- Bean Collection Data Source
- Empty Data Source
Complete element kinds:
- staticText, textField, line, rectangle, ellipse
- image, subreport, chart, crosstab
- frame, elementGroup, component, break, genericElement
"""
import xml.etree.ElementTree as ET
import json
import os
from typing import List, Dict, Set
from dataclasses import dataclass, field, asdict
@dataclass
class JRXMLChunk:
"""Single chunk data structure"""
chunk_id: int
chunk_type: str
human_description: str
raw_xml: str
context: str
metadata: Dict = field(default_factory=dict)
class JRXMLSemanticChunker:
"""JRXML Semantic Chunking v3.0"""
# Standard Band types
STANDARD_BANDS: Set[str] = {
"title", "pageHeader", "columnHeader", "detail", "columnFooter",
"pageFooter", "summary", "background", "noData", "lastPageFooter"
}
# Element kinds (using kind attribute)
ELEMENT_KINDS: Set[str] = {
"staticText", "textField", "line", "rectangle", "ellipse",
"image", "subreport", "chart", "crosstab", "frame",
"elementGroup", "component", "break", "genericElement"
}
# Query languages
QUERY_LANGUAGES: Set[str] = {
"sql", "hql", "xpath", "json", "jsonql", "csv", "xml"
}
# Data source related properties
DATA_SOURCE_PROPERTIES: Set[str] = {
"net.sf.jasperreports.data.adapter",
"net.sf.jasperreports.json.source",
"net.sf.jasperreports.csv.source",
"net.sf.jasperreports.json.schema",
"net.sf.jasperreports.csv.column.names",
"net.sf.jasperreports.csv.record.delimiter",
"com.jaspersoft.studio.data.defaultdataadapter",
}
# Field expression properties by query language
FIELD_EXPRESSION_PROPERTIES: Dict[str, str] = {
"json": "net.sf.jasperreports.json.field.expression",
"xpath": "net.sf.jasperreports.xpath.field.expression",
"jsonql": "net.sf.jasperreports.jsonql.field.expression",
}
def __init__(self, max_chunk_size: int = 2000):
self.max_chunk_size = max_chunk_size
def chunk_file(self, file_path: str) -> List[Dict]:
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
tree = ET.parse(file_path)
root = tree.getroot()
report_name = root.attrib.get("name", "UnnamedReport")
chunks = []
chunk_id = 0
# Report Overview (with data source analysis)
overview_chunk = self._create_overview_chunk(chunk_id, root, report_name)
chunks.append(asdict(overview_chunk))
chunk_id += 1
# Imports
import_chunks = self._extract_import_chunks(chunk_id, root, report_name)
for ic in import_chunks:
chunks.append(asdict(ic))
chunk_id += 1
# Dataset Definitions
dataset_chunks = self._extract_dataset_chunks(chunk_id, root, report_name)
for dc in dataset_chunks:
chunks.append(asdict(dc))
chunk_id += 1
# Data Source Configuration
datasource_chunks = self._extract_datasource_chunks(chunk_id, root, report_name)
for dsc in datasource_chunks:
chunks.append(asdict(dsc))
chunk_id += 1
# SQL/Query (main query)
query_chunks = self._extract_query_chunks(chunk_id, root, report_name)
for qc in query_chunks:
chunks.append(asdict(qc))
chunk_id += 1
# Parameter Definitions
param_chunks = self._extract_parameter_chunks(chunk_id, root, report_name)
for pc in param_chunks:
chunks.append(asdict(pc))
chunk_id += 1
# Field Definitions
field_chunks = self._extract_field_chunks(chunk_id, root, report_name)
for fc in field_chunks:
chunks.append(asdict(fc))
chunk_id += 1
# Sort Fields
sortfield_chunks = self._extract_sortfield_chunks(chunk_id, root, report_name)
for sfc in sortfield_chunks:
chunks.append(asdict(sfc))
chunk_id += 1
# Filter Expression
filter_chunks = self._extract_filter_chunks(chunk_id, root, report_name)
for fc in filter_chunks:
chunks.append(asdict(fc))
chunk_id += 1
# Variable Definitions
variable_chunks = self._extract_variable_chunks(chunk_id, root, report_name)
for vc in variable_chunks:
chunks.append(asdict(vc))
chunk_id += 1
# Style Definitions
style_chunks = self._extract_style_chunks(chunk_id, root, report_name)
for sc in style_chunks:
chunks.append(asdict(sc))
chunk_id += 1
# Group Definitions
group_chunks = self._extract_group_chunks(chunk_id, root, report_name)
for gc in group_chunks:
chunks.append(asdict(gc))
chunk_id += 1
# Standard Bands
band_chunks = self._extract_standard_band_chunks(chunk_id, root, report_name)
for bc in band_chunks:
chunks.append(asdict(bc))
chunk_id += 1
# Charts
chart_chunks = self._extract_chart_chunks(chunk_id, root, report_name)
for cc in chart_chunks:
chunks.append(asdict(cc))
chunk_id += 1
# Crosstabs
crosstab_chunks = self._extract_crosstab_chunks(chunk_id, root, report_name)
for ctc in crosstab_chunks:
chunks.append(asdict(ctc))
chunk_id += 1
# Subreports
subreport_chunks = self._extract_subreport_chunks(chunk_id, root, report_name)
for src in subreport_chunks:
chunks.append(asdict(src))
chunk_id += 1
# Components (lists, etc.)
component_chunks = self._extract_component_chunks(chunk_id, root, report_name)
for cc in component_chunks:
chunks.append(asdict(cc))
chunk_id += 1
return chunks
def chunk_directory(self, dir_path: str, extensions: tuple = (".jrxml",)) -> List[Dict]:
all_chunks = []
file_count = 0
for root, _, files in os.walk(dir_path):
for file in files:
if file.lower().endswith(extensions):
file_path = os.path.join(root, file)
try:
chunks = self.chunk_file(file_path)
all_chunks.extend(chunks)
file_count += 1
print(f"OK {file_path}: {len(chunks)} chunks")
except Exception as e:
print(f"FAIL {file_path}: {e}")
print(f"\nTotal: {file_count} files, {len(all_chunks)} chunks")
return all_chunks
# ==================== Overview ====================
def _create_overview_chunk(self, chunk_id: int, root: ET.Element, report_name: str) -> JRXMLChunk:
attrs = dict(root.attrib)
bands_with_content = [b.tag for b in root if b.tag in self.STANDARD_BANDS and len(b) > 0]
fields = root.findall("field")
field_names = [f.attrib.get("name", "") for f in fields]
params = root.findall("parameter")
param_names = [p.attrib.get("name", "") for p in params]
variables = root.findall("variable")
variable_names = [v.attrib.get("name", "") for v in variables]
groups = root.findall("group")
group_names = [g.attrib.get("name", "") for g in groups]
# Analyze data source
datasource_info = self._analyze_datasource(root)
# Check for charts and crosstabs
charts = root.findall(".//element[@kind='chart']")
crosstabs = root.findall(".//element[@kind='crosstab']")
subreports = root.findall(".//element[@kind='subreport']")
desc_parts = [
f"This is a JasperReports template overview for report '{report_name}'.",
f"Page size: {attrs.get('pageWidth', 'N/A')} x {attrs.get('pageHeight', 'N/A')} {attrs.get('orientation', 'portrait')}.",
f"Contains {len(bands_with_content)} standard bands with content: {', '.join(bands_with_content) if bands_with_content else 'none'}.",
f"Defines {len(fields)} fields: {', '.join(field_names) if field_names else 'none'}.",
f"Defines {len(params)} parameters: {', '.join(param_names) if param_names else 'none'}.",
f"Defines {len(variables)} variables: {', '.join(variable_names) if variable_names else 'none'}.",
f"Defines {len(groups)} groups: {', '.join(group_names) if group_names else 'none'}.",
]
# Add data source info
if datasource_info["type"]:
desc_parts.append(f"Data source type: {datasource_info['type']}.")
if datasource_info["source"]:
desc_parts.append(f"Source: {datasource_info['source']}.")
if datasource_info["query_language"]:
desc_parts.append(f"Query language: {datasource_info['query_language']}.")
if charts:
desc_parts.append(f"Contains {len(charts)} charts.")
if crosstabs:
desc_parts.append(f"Contains {len(crosstabs)} crosstabs.")
if subreports:
desc_parts.append(f"Contains {len(subreports)} subreports.")
description = " ".join(desc_parts)
attr_strs = [f'{k}="{v}"' for k, v in attrs.items()]
root_attrs_xml = "<jasperReport " + " ".join(attr_strs) + " />"
return JRXMLChunk(
chunk_id=chunk_id,
chunk_type="report_overview",
human_description=description,
raw_xml=root_attrs_xml,
context=f"Report '{report_name}' overall structure overview",
metadata={
"report_name": report_name,
"bands": bands_with_content,
"field_count": len(fields),
"parameter_count": len(params),
"variable_count": len(variables),
"group_count": len(groups),
"chart_count": len(charts),
"crosstab_count": len(crosstabs),
"subreport_count": len(subreports),
"datasource": datasource_info,
"attributes": attrs
}
)
def _analyze_datasource(self, root: ET.Element) -> Dict:
"""Analyze the data source configuration of a report"""
info = {
"type": None,
"source": None,
"query_language": None,
"properties": {}
}
# Check for query language
query_elem = root.find("query")
if query_elem is not None:
lang = query_elem.attrib.get("language", "").lower()
if lang in self.QUERY_LANGUAGES:
info["query_language"] = lang
info["type"] = self._get_datasource_type_by_query_lang(lang)
# Check for data adapter property
for prop in root.findall("property"):
name = prop.attrib.get("name", "")
value = prop.attrib.get("value", "")
if name == "net.sf.jasperreports.data.adapter":
info["source"] = value
if not info["type"]:
info["type"] = "DataAdapter"
elif name == "net.sf.jasperreports.json.source":
info["source"] = value
if not info["type"]:
info["type"] = "JSON"
elif name == "net.sf.jasperreports.csv.source":
info["source"] = value
if not info["type"]:
info["type"] = "CSV"
elif name == "com.jaspersoft.studio.data.defaultdataadapter":
if value and value != "NO_DATA_ADAPTER":
info["source"] = value
if not info["type"]:
info["type"] = "DataAdapter"
if name in self.DATA_SOURCE_PROPERTIES or name.startswith("net.sf.jasperreports."):
info["properties"][name] = value
return info
def _get_datasource_type_by_query_lang(self, lang: str) -> str:
"""Map query language to data source type"""
mapping = {
"sql": "JDBC/SQL",
"hql": "Hibernate/HQL",
"xpath": "XML/XPath",
"json": "JSON",
"jsonql": "JSONQL",
"csv": "CSV",
"xml": "XML"
}
return mapping.get(lang, lang.upper())
# ==================== Imports ====================
def _extract_import_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
imports = root.findall("import")
if not imports:
return chunks
import_list = []
for imp in imports:
imp_text = imp.attrib.get("value", "")
if imp_text:
import_list.append(imp_text)
if import_list:
imports_xml = "\n".join([f'<import value="{imp}" />' for imp in import_list])
description = f"These are Java imports for report '{report_name}': {', '.join(import_list)}."
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="imports",
human_description=description,
raw_xml=imports_xml,
context=f"Report '{report_name}' Java imports",
metadata={"imports": import_list, "count": len(import_list)}
))
return chunks
# ==================== Data Source ====================
def _extract_datasource_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
props = root.findall("property")
datasource_props = {}
for prop in props:
name = prop.attrib.get("name", "")
value = prop.attrib.get("value", "")
if name in self.DATA_SOURCE_PROPERTIES or any(name.startswith(p) for p in [
"net.sf.jasperreports.data", "net.sf.jasperreports.json",
"net.sf.jasperreports.csv", "com.jaspersoft.studio.data"
]):
datasource_props[name] = value
if not datasource_props:
return chunks
props_xml_parts = []
for name, value in datasource_props.items():
props_xml_parts.append(f'<property name="{name}" value="{value}"/>')
description_parts = [f"These are data source configuration properties for report '{report_name}'."]
for name, value in datasource_props.items():
if "adapter" in name.lower():
description_parts.append(f"Data adapter: {value}.")
elif "source" in name.lower():
description_parts.append(f"Data source: {value}.")
description = " ".join(description_parts)
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="datasource_config",
human_description=description,
raw_xml="\n".join(props_xml_parts),
context=f"Report '{report_name}' data source configuration",
metadata={"properties": datasource_props}
))
return chunks
# ==================== Query ====================
def _extract_query_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
query_elem = root.find("query")
if query_elem is None:
return chunks
query_xml = ET.tostring(query_elem, encoding="unicode")
lang = query_elem.attrib.get("language", "SQL").lower()
query_text = ""
if query_elem.text and query_elem.text.strip():
query_text = query_elem.text.strip()
query_preview = query_text[:300] + ("..." if len(query_text) > 300 else "")
description = f"This is the data query for report '{report_name}'. Language: {lang.upper()}. Query: {query_preview}"
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="query",
human_description=description,
raw_xml=query_xml,
context=f"Report '{report_name}' data query",
metadata={"query_language": lang, "full_sql": query_text}
))
return chunks
# ==================== Parameters ====================
def _extract_parameter_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
params = root.findall("parameter")
if not params:
return chunks
params_xml = "\n".join([ET.tostring(p, encoding="unicode") for p in params])
param_names = [p.attrib.get("name", "") for p in params]
param_types = {p.attrib.get("name", ""): p.attrib.get("class", "java.lang.String") for p in params}
# Extract default values
default_values = {}
for p in params:
name = p.attrib.get("name", "")
def_val = p.find("defaultValueExpression")
if def_val is not None and def_val.text:
default_values[name] = def_val.text.strip()
param_list = ", ".join(f"{name}({param_types.get(name, 'String')})" for name in param_names)
description = f"These are all parameter definitions for report '{report_name}', total {len(params)} parameters. Parameters: {param_list}."
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="parameters",
human_description=description,
raw_xml=params_xml,
context=f"Report '{report_name}' parameter definitions",
metadata={"parameter_names": param_names, "parameter_types": param_types, "default_values": default_values, "count": len(params)}
))
return chunks
# ==================== Fields ====================
def _extract_field_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
fields = root.findall("field")
if not fields:
return chunks
if len(fields) <= 5:
fields_xml = "\n".join([ET.tostring(f, encoding="unicode") for f in fields])
field_names = [f.attrib.get("name", "") for f in fields]
field_types = {f.attrib.get("name", ""): f.attrib.get("class", "java.lang.String") for f in fields}
# Extract field expression properties
field_exprs = {}
for f in fields:
fname = f.attrib.get("name", "")
for prop in f.findall("property"):
pname = prop.attrib.get("name", "")
if "field.expression" in pname:
field_exprs[fname] = {"property": pname, "value": prop.attrib.get("value", "")}
field_list = ", ".join(f"{name}({field_types.get(name, 'String')})" for name in field_names)
description = f"These are all field definitions for report '{report_name}', total {len(fields)} fields. Fields: {field_list}."
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="fields",
human_description=description,
raw_xml=fields_xml,
context=f"Report '{report_name}' field definitions",
metadata={"field_names": field_names, "field_types": field_types, "field_expressions": field_exprs, "count": len(fields)}
))
else:
for i, field in enumerate(fields):
field_name = field.attrib.get("name", f"field_{i}")
field_class = field.attrib.get("class", "java.lang.String")
field_xml = ET.tostring(field, encoding="unicode")
# Check for field expression property
field_expr = None
for prop in field.findall("property"):
pname = prop.attrib.get("name", "")
if "field.expression" in pname:
field_expr = {"property": pname, "value": prop.attrib.get("value", "")}
break
desc = f"Field definition for report '{report_name}': {field_name}, type: {field_class}"
if field_expr:
desc += f", expression: {field_expr['value']}"
chunks.append(JRXMLChunk(
chunk_id=chunk_id + i,
chunk_type="field",
human_description=desc,
raw_xml=field_xml,
context=f"Report '{report_name}' field '{field_name}'",
metadata={"field_name": field_name, "field_class": field_class, "field_expression": field_expr}
))
return chunks
# ==================== Sort Fields ====================
def _extract_sortfield_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
sortfields = root.findall("sortField")
if not sortfields:
return chunks
sortfield_info = []
for sf in sortfields:
name = sf.attrib.get("name", "")
order = sf.attrib.get("order", "Ascending")
sortfield_info.append({"name": name, "order": order})
sortfields_xml = "\n".join([ET.tostring(sf, encoding="unicode") for sf in sortfields])
description = f"These are sort field definitions for report '{report_name}', total {len(sortfields)} fields. Sorts: {', '.join(s['name'] + ' (' + s['order'] + ')' for s in sortfield_info)}."
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="sortFields",
human_description=description,
raw_xml=sortfields_xml,
context=f"Report '{report_name}' sort field definitions",
metadata={"sortFields": sortfield_info, "count": len(sortfields)}
))
return chunks
# ==================== Filter Expression ====================
def _extract_filter_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
filter_elem = root.find("filterExpression")
if filter_elem is None:
return chunks
filter_xml = ET.tostring(filter_elem, encoding="unicode")
filter_text = filter_elem.text.strip() if filter_elem.text else ""
description = f"This is the filter expression for report '{report_name}': {filter_text[:200]}{'...' if len(filter_text) > 200 else ''}"
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="filterExpression",
human_description=description,
raw_xml=filter_xml,
context=f"Report '{report_name}' filter expression",
metadata={"filter_expression": filter_text}
))
return chunks
# ==================== Variables ====================
def _extract_variable_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
variables = root.findall("variable")
if not variables:
return chunks
variables_by_reset = {}
for v in variables:
reset_type = v.attrib.get("resetType", "Report")
if reset_type not in variables_by_reset:
variables_by_reset[reset_type] = []
variables_by_reset[reset_type].append(v)
for reset_type, vars_list in variables_by_reset.items():
var_names = [v.attrib.get("name", "") for v in vars_list]
var_types = {v.attrib.get("name", ""): v.attrib.get("class", "java.lang.Object") for v in vars_list}
var_calcs = {v.attrib.get("name", ""): v.attrib.get("calculation", "Nothing") for v in vars_list}
expressions = {}
for v in vars_list:
name = v.attrib.get("name", "")
expr_elem = v.find("expression")
if expr_elem is not None and expr_elem.text:
expressions[name] = {"type": "expression", "value": expr_elem.text.strip()}
else:
init_expr_elem = v.find("initialValueExpression")
if init_expr_elem is not None and init_expr_elem.text:
expressions[name] = {"type": "initialValue", "value": init_expr_elem.text.strip()}
var_list = ", ".join(f"{n}({var_types.get(n, 'Object')}, {var_calcs.get(n, 'Nothing')})" for n in var_names)
description = f"These are variable definitions for report '{report_name}' (resetType={reset_type}), total {len(vars_list)} variables. Variables: {var_list}."
variables_xml = "\n".join([ET.tostring(v, encoding="unicode") for v in vars_list])
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type=f"variables_{reset_type.lower()}",
human_description=description,
raw_xml=variables_xml,
context=f"Report '{report_name}' variable definitions ({reset_type} level reset)",
metadata={"variable_names": var_names, "variable_types": var_types, "variable_calculations": var_calcs, "reset_type": reset_type, "expressions": expressions, "count": len(vars_list)}
))
return chunks
# ==================== Styles ====================
def _extract_style_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
styles = root.findall("style")
if not styles:
return chunks
styles_xml = "\n".join([ET.tostring(s, encoding="unicode") for s in styles])
style_names = [s.attrib.get("name", "") for s in styles]
default_style = [s.attrib.get("name", "") for s in styles if s.attrib.get("default") == "true"]
has_conditional = any(s.find("conditionalStyle") is not None for s in styles)
desc_parts = [f"These are style definitions for report '{report_name}', total {len(styles)} styles.", f"Styles: {', '.join(style_names)}."]
if default_style:
desc_parts.append(f"Default style: {default_style[0]}.")
if has_conditional:
desc_parts.append("Contains conditional styles.")
description = " ".join(desc_parts)
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type="styles",
human_description=description,
raw_xml=styles_xml,
context=f"Report '{report_name}' style definitions",
metadata={"style_names": style_names, "default_style": default_style[0] if default_style else None, "has_conditional_styles": has_conditional, "count": len(styles)}
))
return chunks
# ==================== Dataset ====================
def _extract_dataset_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
datasets = root.findall("dataset")
for i, dataset in enumerate(datasets):
dataset_name = dataset.attrib.get("name", f"dataset_{i}")
ds_query = dataset.find("query")
ds_fields = dataset.findall("field")
ds_params = dataset.findall("parameter")
query_text = ""
query_lang = ""
if ds_query is not None:
query_lang = ds_query.attrib.get("language", "").lower()
if ds_query.text:
query_text = ds_query.text.strip()
field_names = [f.attrib.get("name", "") for f in ds_fields]
param_names = [p.attrib.get("name", "") for p in ds_params]
dataset_xml = ET.tostring(dataset, encoding="unicode")
# Check for data adapter property
ds_props = {}
for prop in dataset.findall("property"):
pname = prop.attrib.get("name", "")
pvalue = prop.attrib.get("value", "")
if "adapter" in pname.lower() or "source" in pname.lower():
ds_props[pname] = pvalue
query_preview = query_text[:150] + ("..." if len(query_text) > 150 else "")
query_part = f"Query ({query_lang}): {query_preview}" if query_text else f"Query language: {query_lang}" if query_lang else "No query."
desc_parts = [f"This is dataset '{dataset_name}' definition for report '{report_name}'.", f"Contains {len(ds_fields)} fields: {', '.join(field_names) if field_names else 'none'}.", f"Contains {len(ds_params)} parameters: {', '.join(param_names) if param_names else 'none'}.", query_part]
description = " ".join(desc_parts)
chunks.append(JRXMLChunk(
chunk_id=chunk_id + i,
chunk_type="dataset",
human_description=description,
raw_xml=dataset_xml,
context=f"Report '{report_name}' dataset '{dataset_name}'",
metadata={"dataset_name": dataset_name, "field_names": field_names, "parameter_names": param_names, "query": query_text, "query_language": query_lang, "properties": ds_props}
))
return chunks
# ==================== Groups ====================
def _extract_group_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
groups = root.findall("group")
if not groups:
return chunks
for i, group in enumerate(groups):
group_name = group.attrib.get("name", f"group_{i}")
group_xml = ET.tostring(group, encoding="unicode")
expr_elem = group.find("expression")
expr_text = expr_elem.text.strip() if expr_elem is not None and expr_elem.text else ""
has_header = group.find("groupHeader") is not None
has_footer = group.find("groupFooter") is not None
min_height = group.attrib.get("minHeightToStartNewPage", "0")
start_new_column = group.attrib.get("startNewColumn", "false")
reprint_header = group.attrib.get("reprintHeaderOnEachPage", "false")
desc_parts = [f"This is group '{group_name}' definition for report '{report_name}'.", f"Group expression: {expr_text}.", f"Has groupHeader: {'Yes' if has_header else 'No'}, has groupFooter: {'Yes' if has_footer else 'No'}.", f"Min height: {min_height}, start new column: {start_new_column}, reprint header: {reprint_header}."]
description = " ".join(desc_parts)
chunks.append(JRXMLChunk(
chunk_id=chunk_id + i,
chunk_type="group",
human_description=description,
raw_xml=group_xml,
context=f"Report '{report_name}' group '{group_name}'",
metadata={"group_name": group_name, "expression": expr_text, "has_header": has_header, "has_footer": has_footer, "minHeightToStartNewPage": min_height, "startNewColumn": start_new_column, "reprintHeaderOnEachPage": reprint_header}
))
return chunks
# ==================== Bands ====================
def _extract_standard_band_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
for band_name in self.STANDARD_BANDS:
band_elem = root.find(band_name)
current_id = chunk_id # 使用局部变量跟踪ID
for band_name in self.STANDARD_BANDS:
band_elem = root.find(band_name)
if band_elem is None:
continue
band_height = band_elem.attrib.get("height", "0")
band_xml = ET.tostring(band_elem, encoding="unicode")
# Get all direct children (elements with kind attribute, frames, etc.)
elements = band_elem.findall("element")
element_kinds = {}
visible_texts = []
for elem in elements:
kind = elem.attrib.get("kind", "unknown")
element_kinds[kind] = element_kinds.get(kind, 0) + 1
if kind == "staticText":
text_node = elem.find("text")
if text_node is not None and text_node.text:
visible_texts.append(text_node.text.strip())
elif kind == "textField":
expr_node = elem.find("expression")
if expr_node is not None and expr_node.text:
visible_texts.append("${" + expr_node.text.strip() + "}")
# Check for nested band elements (detail bands can have nested bands)
nested_bands = band_elem.findall("band")
frames = band_elem.findall("frame")
element_groups = band_elem.findall("elementGroup")
# Check for splitType
split_type = band_elem.attrib.get("splitType", "Stretch")
elem_counts_str = ", ".join(f"{count}x {kind}" for kind, count in element_kinds.items())
desc_parts = [f"This is the '{band_name}' band of report '{report_name}', height: {band_height} pixels, splitType: {split_type}.", f"Contains {len(elements)} elements: {elem_counts_str}."]
# Add nested band info
if nested_bands:
nested_info = ", ".join(f"nested band (h={b.attrib.get('height', '0')})" for b in nested_bands)
desc_parts.append(f"Contains {len(nested_bands)} nested bands: {nested_info}.")
if visible_texts:
preview = "; ".join(visible_texts[:3])
if len(visible_texts) > 3:
preview += f" ... and {len(visible_texts)} more texts"
desc_parts.append(f"Visible text samples: {preview}")
description = " ".join(desc_parts)
if len(band_xml) > self.max_chunk_size:
sub_chunks = self._split_band_elements(band_elem, band_name, report_name, chunk_id)
chunks.extend(sub_chunks)
chunk_id += len(sub_chunks)
else:
chunks.append(JRXMLChunk(
chunk_id=chunk_id,
chunk_type=f"band_{band_name}",
human_description=description,
raw_xml=band_xml,
context=f"Report '{report_name}' {band_name} band",
metadata={"band_name": band_name, "band_height": band_height, "split_type": split_type, "element_counts": element_kinds, "element_count": len(elements), "nested_band_count": len(nested_bands), "frame_count": len(frames), "element_group_count": len(element_groups), "visible_texts": visible_texts[:10]}
))
current_id += 1
return chunks
def _split_band_elements(self, band: ET.Element, band_name: str, report_name: str, start_id: int) -> List[JRXMLChunk]:
chunks = []
for i, elem in enumerate(list(band)):
elem_xml = ET.tostring(elem, encoding="unicode")
elem_tag = elem.tag
if elem_tag == "element":
elem_kind = elem.attrib.get("kind", "unknown")
desc_parts = [f"'{band_name}' band of report '{report_name}': {elem_kind} element"]
if elem_kind == "staticText":
text_node = elem.find("text")
text_content = text_node.text if text_node is not None and text_node.text else ""
text_preview = text_content[:50] + ("..." if len(text_content) > 50 else "")
desc_parts.append(f"Text: '{text_preview}'")
elif elem_kind == "textField":
expr = elem.find("expression")
expr_text = expr.text if expr is not None and expr.text else ""
expr_preview = expr_text[:80] + ("..." if len(expr_text) > 80 else "")
desc_parts.append(f"Expression: {expr_preview}")
elif elem_kind == "image":
expr = elem.find("expression")
if expr is not None and expr.text:
desc_parts.append(f"Image: {expr.text[:50]}")
elif elem_kind == "subreport":
expr = elem.find("expression")
if expr is not None and expr.text:
desc_parts.append(f"Subreport: {expr.text[:50]}")
elif elem_kind == "chart":
chart_type = elem.attrib.get("chartType", "unknown")
desc_parts.append(f"Chart type: {chart_type}")
elif elem_kind == "crosstab":
desc_parts.append("Crosstab")
x, y = elem.attrib.get("x", "0"), elem.attrib.get("y", "0")
w, h = elem.attrib.get("width", "0"), elem.attrib.get("height", "0")
desc_parts.append(f"Position: ({x}, {y}), Size: {w}x{h}")
description = ", ".join(desc_parts) + "."
chunks.append(JRXMLChunk(
chunk_id=start_id + i,
chunk_type=f"element_{elem_kind}",
human_description=description,
raw_xml=elem_xml,
context=f"Report '{report_name}' {band_name} band",
metadata={"band_name": band_name, "element_kind": elem_kind, "attributes": dict(elem.attrib)}
))
elif elem_tag in ("frame", "elementGroup"):
desc_parts = [f"'{band_name}' band of report '{report_name}': {elem_tag} container"]
x, y = elem.attrib.get("x", "0"), elem.attrib.get("y", "0")
w, h = elem.attrib.get("width", "0"), elem.attrib.get("height", "0")
desc_parts.append(f"Position: ({x}, {y}), Size: {w}x{h}")
description = ", ".join(desc_parts) + "."
chunks.append(JRXMLChunk(
chunk_id=start_id + i,
chunk_type=f"container_{elem_tag}",
human_description=description,
raw_xml=elem_xml,
context=f"Report '{report_name}' {band_name} band",
metadata={"band_name": band_name, "container_type": elem_tag, "attributes": dict(elem.attrib)}
))
elif elem_tag == "band":
# Nested band
nested_height = elem.attrib.get("height", "0")
desc_parts = [f"'{band_name}' band of report '{report_name}': nested band element"]
desc_parts.append(f"Height: {nested_height}")
description = ", ".join(desc_parts) + "."
chunks.append(JRXMLChunk(
chunk_id=start_id + i,
chunk_type="nested_band",
human_description=description,
raw_xml=elem_xml,
context=f"Report '{report_name}' {band_name} band",
metadata={"band_name": band_name, "nested_height": nested_height, "element_tag": elem_tag}
))
else:
description = f"'{band_name}' band of report '{report_name}': {elem_tag} style element."
chunks.append(JRXMLChunk(
chunk_id=start_id + i,
chunk_type=f"style_element_{elem_tag}",
human_description=description,
raw_xml=elem_xml,
context=f"Report '{report_name}' {band_name} band",
metadata={"band_name": band_name, "element_tag": elem_tag}
))
return chunks
# ==================== Charts ====================
def _extract_chart_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
for i, chart in enumerate(root.findall(".//element[@kind='chart']")):
chart_type = chart.attrib.get("chartType", "unknown")
chart_xml = ET.tostring(chart, encoding="unicode")
dataset = chart.find("dataset")
dataset_info = {}
if dataset is not None:
dataset_info["kind"] = dataset.attrib.get("kind", "unknown")
series_list = []
for series in dataset.findall(".//series"):
key_expr = series.find("keyExpression")
value_expr = series.find("valueExpression")
series_info = {"key": key_expr.text.strip() if key_expr is not None and key_expr.text else "", "value": value_expr.text.strip() if value_expr is not None and value_expr.text else ""}
series_list.append(series_info)
dataset_info["series"] = series_list
plot = chart.find("plot")
plot_info = {}
if plot is not None:
plot_info["labelFormat"] = plot.attrib.get("labelFormat", "")
plot_info["legendLabelFormat"] = plot.attrib.get("legendLabelFormat", "")
description = f"This is a chart element in report '{report_name}', type: {chart_type}. Dataset kind: {dataset_info.get('kind', 'unknown')}. Label format: {plot_info.get('labelFormat', 'N/A')}."
chunks.append(JRXMLChunk(
chunk_id=chunk_id + i,
chunk_type="chart",
human_description=description,
raw_xml=chart_xml,
context=f"Report '{report_name}' chart",
metadata={"chart_type": chart_type, "dataset": dataset_info, "plot": plot_info, "attributes": dict(chart.attrib)}
))
return chunks
# ==================== Crosstabs ====================
def _extract_crosstab_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
for i, crosstab in enumerate(root.findall(".//element[@kind='crosstab']")):
crosstab_xml = ET.tostring(crosstab, encoding="unicode")
row_groups = []
for rg in crosstab.findall("rowGroup"):
name = rg.attrib.get("name", "")
width = rg.attrib.get("width", "")
total_position = rg.attrib.get("totalPosition", "None")
bucket_expr = rg.find("bucket/expression")
bucket_text = bucket_expr.text.strip() if bucket_expr is not None and bucket_expr.text else ""
row_groups.append({"name": name, "width": width, "totalPosition": total_position, "bucket": bucket_text})
col_groups = []
for cg in crosstab.findall("columnGroup"):
name = cg.attrib.get("name", "")
height = cg.attrib.get("height", "")
total_position = cg.attrib.get("totalPosition", "None")
bucket_expr = cg.find("bucket/expression")
bucket_text = bucket_expr.text.strip() if bucket_expr is not None and bucket_expr.text else ""
col_groups.append({"name": name, "height": height, "totalPosition": total_position, "bucket": bucket_text})
measures = []
for m in crosstab.findall("measure"):
name = m.attrib.get("name", "")
calc = m.attrib.get("calculation", "Nothing")
class_type = m.attrib.get("class", "java.lang.Object")
expr_elem = m.find("expression")
expr_text = expr_elem.text.strip() if expr_elem is not None and expr_elem.text else ""
measures.append({"name": name, "calculation": calc, "class": class_type, "expression": expr_text})
desc_parts = [f"This is a crosstab element in report '{report_name}'.", f"Row groups: {len(row_groups)} - {', '.join(rg['name'] for rg in row_groups)}.", f"Column groups: {len(col_groups)} - {', '.join(cg['name'] for cg in col_groups)}.", f"Measures: {len(measures)} - {', '.join(m['name'] for m in measures)}."]
description = " ".join(desc_parts)
chunks.append(JRXMLChunk(
chunk_id=chunk_id + i,
chunk_type="crosstab",
human_description=description,
raw_xml=crosstab_xml,
context=f"Report '{report_name}' crosstab",
metadata={"row_groups": row_groups, "column_groups": col_groups, "measures": measures, "attributes": dict(crosstab.attrib)}
))
return chunks
# ==================== Subreports ====================
def _extract_subreport_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
chunks = []
for i, subreport in enumerate(root.findall(".//element[@kind='subreport']")):
subreport_xml = ET.tostring(subreport, encoding="unicode")
expr_elem = subreport.find("expression")
expr_text = expr_elem.text.strip() if expr_elem is not None and expr_elem.text else ""
conn_elem = subreport.find("connectionExpression")
conn_text = conn_elem.text.strip() if conn_elem is not None and conn_elem.text else ""
return_values = []
for rv in subreport.findall("returnValue"):
return_values.append({"toVariable": rv.attrib.get("toVariable", ""), "subreportVariable": rv.attrib.get("subreportVariable", ""), "calculation": rv.attrib.get("calculation", "Nothing")})
sub_params = []
for sp in subreport.findall("parameter"):
pname = sp.attrib.get("name", "")
pexpr = sp.find("expression")
ptext = pexpr.text.strip() if pexpr is not None and pexpr.text else ""
sub_params.append({"name": pname, "expression": ptext})
# Check for data source expression
ds_expr = subreport.find("dataSourceExpression")
ds_text = ds_expr.text.strip() if ds_expr is not None and ds_expr.text else ""
desc_parts = [f"This is a subreport element in report '{report_name}'.", f"Subreport: {expr_text}."]
if conn_text:
desc_parts.append(f"Connection: {conn_text}.")
if ds_text:
desc_parts.append(f"Data source expression: {ds_text[:80]}.")
if return_values:
desc_parts.append(f"Return value mappings: {len(return_values)}.")
if sub_params:
param_names = ", ".join(p["name"] for p in sub_params)
desc_parts.append(f"Subreport parameters: {len(sub_params)} - {param_names}.")
description = " ".join(desc_parts)
chunks.append(JRXMLChunk(
chunk_id=chunk_id + i,
chunk_type="subreport",
human_description=description,
raw_xml=subreport_xml,
context=f"Report '{report_name}' subreport",
metadata={"expression": expr_text, "connectionExpression": conn_text, "dataSourceExpression": ds_text, "returnValues": return_values, "parameters": sub_params, "attributes": dict(subreport.attrib)}
))
return chunks
# ==================== Components ====================
def _extract_component_chunks(self, chunk_id: int, root: ET.Element, report_name: str) -> List[JRXMLChunk]:
"""Extract component elements like lists"""
chunks = []
# Find all component elements
for i, component in enumerate(root.findall(".//element[@kind='component']")):
component_xml = ET.tostring(component, encoding="unicode")
component_kind = component.attrib.get("component", {}).get("kind", "unknown") if component.attrib.get("component") else "unknown"
# Get the nested component definition
nested_component = component.find("component")
if nested_component is not None:
nested_kind = nested_component.attrib.get("kind", "")
# Extract datasetRun info
dataset_runs = nested_component.findall("datasetRun")
dataset_run_info = []
for dr in dataset_runs:
sub_ds = dr.attrib.get("subDataset", "")
ds_expr = dr.find("dataSourceExpression")
ds_text = ds_expr.text.strip() if ds_expr is not None and ds_expr.text else ""
dataset_run_info.append({"subDataset": sub_ds, "dataSourceExpression": ds_text})
# Extract contents
contents = nested_component.find("contents")
contents_height = contents.attrib.get("height", "0") if contents is not None else "0"
contents_width = contents.attrib.get("width", "0") if contents is not None else "0"
desc_parts = [f"This is a component element in report '{report_name}'.", f"Component kind: {nested_kind}.", f"Contains {len(dataset_run_info)} datasetRun(s)."]
if dataset_run_info:
for dr_info in dataset_run_info:
if dr_info["subDataset"]:
desc_parts.append(f"SubDataset: {dr_info['subDataset']}.")
desc_parts.append(f"Content size: {contents_width}x{contents_height}.")
description = " ".join(desc_parts)
chunks.append(JRXMLChunk(
chunk_id=chunk_id + i,
chunk_type="component",
human_description=description,
raw_xml=component_xml,
context=f"Report '{report_name}' component",
metadata={"component_kind": nested_kind, "dataset_runs": dataset_run_info, "contents_height": contents_height, "contents_width": contents_width, "attributes": dict(component.attrib)}
))
return chunks
# =====================================================
# Utility Functions
# =====================================================
def save_chunks_to_json(chunks: List[Dict], output_path: str):
with open(output_path, "w", encoding="utf-8") as f:
json.dump(chunks, f, ensure_ascii=False, indent=2)
print(f"Saved {len(chunks)} chunks to {output_path}")
def chunks_to_langchain_documents(chunks: List[Dict]):
from langchain.schema import Document
docs = []
for chunk in chunks:
docs.append(Document(page_content=chunk["human_description"], metadata={"chunk_id": chunk["chunk_id"], "chunk_type": chunk["chunk_type"], "raw_xml": chunk["raw_xml"], "context": chunk["context"], **chunk.get("metadata", {})}))
return docs
def print_chunk_summary(chunks: List[Dict]):
"""Print summary of chunks by type"""
type_counts = {}
for chunk in chunks:
chunk_type = chunk["chunk_type"]
type_counts[chunk_type] = type_counts.get(chunk_type, 0) + 1
print("\nChunk Type Summary:")
for chunk_type, count in sorted(type_counts.items()):
print(f" {chunk_type}: {count}")
# =====================================================
# Main Entry Point
# =====================================================
if __name__ == "__main__":
import sys
chunker = JRXMLSemanticChunker(max_chunk_size=2000)
if len(sys.argv) > 1:
path = sys.argv[1]
if os.path.isdir(path):
all_chunks = chunker.chunk_directory(path)
output_path = os.path.join(os.path.dirname(path.rstrip("/\\")) if os.path.dirname(path) else ".", os.path.basename(path.rstrip("/\\")) + "_chunks.json")
save_chunks_to_json(all_chunks, output_path)
print_chunk_summary(all_chunks)
else:
chunks = chunker.chunk_file(path)
output_path = path.replace(".jrxml", "_chunks.json")
save_chunks_to_json(chunks, output_path)
print(f"\n{'='*60}")
print("Chunking Results Preview")
print(f"{'='*60}")
for chunk in chunks[:10]:
print(f"\n[Chunk {chunk['chunk_id']}] Type: {chunk['chunk_type']}")
print(f"Description: {chunk['human_description'][:120]}...")
print(f"XML length: {len(chunk['raw_xml'])} chars")
if len(chunks) > 10:
print(f"\n... and {len(chunks) - 10} more chunks")
print_chunk_summary(chunks)
try:
langchain_docs = chunks_to_langchain_documents(chunks)
print(f"\nGenerated {len(langchain_docs)} LangChain Documents")
except ImportError:
print("\nNote: langchain not installed, skipping Document conversion")
else:
print("=" * 60)
print("JRXML Semantic Chunking v3.0")
print("=" * 60)
print("\nUsage: python jrxml_chunker.py <jrxml_file_or_directory>")
print("\nData source types supported:")
print(" - SQL/JDBC, HQL/Hibernate, XPath/XML")
print(" - JSON, JSONQL, CSV")
print(" - Data Adapters (Excel, XML, HTTP)")
print(" - Bean Collection, Empty Data Source")