Files
rag_jrxml/jrxml_banch_chunker.py
T
panda 4f475e9e36 feat: 添加Qwen3嵌入模型及JRXML报告相关文件
添加Qwen3-4B嵌入模型配置文件及权重文件
添加多个JRXML报告的数据查询和字段定义文件
添加PdfEncryptReport.jrxml示例报告文件
2026-05-11 08:34:03 +08:00

245 lines
8.2 KiB
Python

"""
JRXML Batch Chunker
批量处理JRXML模板文件的入口脚本
"""
import os
import sys
import json
import time
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from jrxml_chunker import JRXMLSemanticChunker, save_chunks_to_json, print_chunk_summary
def batch_chunk_with_report(input_dir: str, output_dir: str = None, max_chunk_size: int = 2000):
"""
批量分块并生成详细报告
Args:
input_dir: JRXML文件目录
output_dir: 输出目录,默认为 input_dir/../chunked_output
max_chunk_size: 单个chunk最大字节数
"""
input_path = Path(input_dir).resolve()
if not input_path.exists():
print(f"❌ 目录不存在: {input_path}")
return None
if not input_path.is_dir():
print(f"❌ 不是目录: {input_path}")
return None
# 设置输出目录
if output_dir is None:
output_dir = input_path.parent / f"{input_path.name}_chunked_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
print(f"\n{'='*60}")
print(f"JRXML 语义分块 v3.0 - 批量处理")
print(f"{'='*60}")
print(f"输入目录: {input_path}")
print(f"输出目录: {output_path}")
print(f"{'='*60}\n")
# 初始化
chunker = JRXMLSemanticChunker(max_chunk_size=max_chunk_size)
# 收集所有JRXML文件
jrxml_files = list(input_path.rglob("*.jrxml")) + list(input_path.rglob("*.JRXML"))
total_files = len(jrxml_files)
print(f"找到 {total_files} 个JRXML文件\n")
if total_files == 0:
print("⚠️ 未找到JRXML文件")
return None
# 统计变量
all_chunks = []
stats = {
"total_files": total_files,
"success": 0,
"failed": 0,
"total_chunks": 0,
"failed_files": [],
"chunks_per_file": defaultdict(int),
"chunk_types": defaultdict(int),
"started_at": datetime.now().isoformat()
}
start_time = time.time()
# 逐个处理文件
for i, jrxml_file in enumerate(jrxml_files, 1):
relative_path = jrxml_file.relative_to(input_path)
try:
file_start = time.time()
chunks = chunker.chunk_file(str(jrxml_file))
file_duration = time.time() - file_start
all_chunks.extend(chunks)
# 统计
stats["success"] += 1
stats["total_chunks"] += len(chunks)
stats["chunks_per_file"][str(relative_path)] = len(chunks)
for chunk in chunks:
stats["chunk_types"][chunk["chunk_type"]] += 1
print(f"[{i}/{total_files}] ✅ {relative_path}{len(chunks)} chunks ({file_duration:.2f}s)")
except Exception as e:
stats["failed"] += 1
error_info = {"file": str(relative_path), "error": str(e)}
stats["failed_files"].append(error_info)
print(f"[{i}/{total_files}] ❌ {relative_path}{e}")
total_duration = time.time() - start_time
stats["processing_time"] = round(total_duration, 2)
stats["finished_at"] = datetime.now().isoformat()
# 保存所有chunks
all_chunks_path = output_path / "all_chunks.json"
save_chunks_to_json(all_chunks, str(all_chunks_path))
# 保存统计报告
stats_path = output_path / "processing_stats.json"
with open(stats_path, "w", encoding="utf-8") as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
# 按文件保存独立chunks
per_file_dir = output_path / "per_file"
per_file_dir.mkdir(exist_ok=True)
chunks_by_file = defaultdict(list)
for chunk in all_chunks:
# 从context中提取文件名
context = chunk.get("context", "")
chunks_by_file[context].append(chunk)
for context, file_chunks in chunks_by_file.items():
# 简化文件名
safe_name = context.replace("'", "").replace(" ", "_").replace("Report_", "")[:100]
file_path = per_file_dir / f"{safe_name}.json"
with open(file_path, "w", encoding="utf-8") as f:
json.dump(file_chunks, f, ensure_ascii=False, indent=2)
# 打印总结
print(f"\n{'='*60}")
print(f"处理完成!")
print(f"{'='*60}")
print(f"✅ 成功: {stats['success']} 文件")
print(f"❌ 失败: {stats['failed']} 文件")
print(f"📦 总Chunks: {stats['total_chunks']}")
print(f"⏱️ 总耗时: {total_duration:.2f}s")
print(f"📂 输出目录: {output_path}")
print(f"\n主要文件:")
print(f" - {all_chunks_path}")
print(f" - {stats_path}")
print(f" - {per_file_dir}/ (按文件分类的chunks)")
print(f"\nChunk类型分布:")
print_chunk_summary(all_chunks)
if stats["failed_files"]:
print(f"\n⚠️ 失败文件详情:")
for fail in stats["failed_files"]:
print(f" - {fail['file']}: {fail['error']}")
return {
"chunks": all_chunks,
"stats": stats,
"output_path": str(output_path)
}
def chunk_single_file_with_report(file_path: str, output_dir: str = None):
"""处理单个文件并生成详细报告"""
file_path = Path(file_path).resolve()
if not file_path.exists():
print(f"❌ 文件不存在: {file_path}")
return None
if output_dir is None:
output_dir = file_path.parent / f"{file_path.stem}_chunks"
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
print(f"\n处理文件: {file_path.name}")
print(f"输出目录: {output_path}\n")
chunker = JRXMLSemanticChunker(max_chunk_size=2000)
start_time = time.time()
chunks = chunker.chunk_file(str(file_path))
duration = time.time() - start_time
# 保存结果
chunks_path = output_path / f"{file_path.stem}_chunks.json"
save_chunks_to_json(chunks, str(chunks_path))
# 生成人类可读的摘要
summary_path = output_path / f"{file_path.stem}_summary.txt"
with open(summary_path, "w", encoding="utf-8") as f:
f.write(f"JRXML Chunking Report: {file_path.name}\n")
f.write(f"{'='*60}\n")
f.write(f"Processing time: {duration:.2f}s\n")
f.write(f"Total chunks: {len(chunks)}\n\n")
for chunk in chunks:
f.write(f"[Chunk {chunk['chunk_id']}] {chunk['chunk_type']}\n")
f.write(f" Description: {chunk['human_description'][:200]}\n")
f.write(f" XML Length: {len(chunk['raw_xml'])} chars\n")
f.write(f" Context: {chunk.get('context', 'N/A')}\n\n")
print(f"✅ 生成 {len(chunks)} chunks")
print(f"📄 Chunks JSON: {chunks_path}")
print(f"📄 可读摘要: {summary_path}")
print(f"⏱️ 耗时: {duration:.2f}s")
print_chunk_summary(chunks)
return chunks
if __name__ == "__main__":
if len(sys.argv) < 2:
print("=" * 60)
print("JRXML Semantic Chunking v3.0 - 批量处理工具")
print("=" * 60)
print("\n用法:")
print(" python batch_chunker.py <目录路径>")
print(" python batch_chunker.py <文件路径>")
print("\n参数:")
print(" <路径> JRXML文件所在目录 或 单个JRXML文件路径")
print(" --output <目录> 指定输出目录 (可选)")
print("\n示例:")
print(" python batch_chunker.py ./jasper_reports")
print(" python batch_chunker.py ./jasper_reports --output ./chunks")
print(" python batch_chunker.py report.jrxml")
sys.exit(0)
input_path = sys.argv[1]
# 解析--output参数
output_dir = None
if "--output" in sys.argv:
idx = sys.argv.index("--output")
if idx + 1 < len(sys.argv):
output_dir = sys.argv[idx + 1]
if os.path.isdir(input_path):
# 批量处理目录
batch_chunk_with_report(input_path, output_dir)
elif os.path.isfile(input_path):
# 处理单个文件
chunk_single_file_with_report(input_path, output_dir)
else:
print(f"❌ 路径无效: {input_path}")