9d78a49625
- 新增config.py统一读取.env配置,移除硬编码路径和参数 - 重构collect_jrxml.py支持命令行参数和环境变量配置源目录 - 新增.env.example示例配置文件,整理所有可配置项 - 重构down_embedding_model.py、import_to_chroma.py等所有脚本使用统一配置 - 新增Windows一键部署脚本setup.bat - 修正jrxml_banch_chunker.py的文件名拼写错误
251 lines
8.5 KiB
Python
251 lines
8.5 KiB
Python
"""
|
|
JRXML Batch Chunker
|
|
批量处理JRXML模板文件的入口脚本
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
from jrxml_chunker import JRXMLSemanticChunker, save_chunks_to_json, print_chunk_summary
|
|
from config import JRXML_SOURCE_DIR, CHUNKER_OUTPUT_DIR, MAX_CHUNK_SIZE
|
|
|
|
|
|
def batch_chunk_with_report(input_dir: str = None, output_dir: str = None, max_chunk_size: int = None):
|
|
"""
|
|
批量分块并生成详细报告
|
|
|
|
Args:
|
|
input_dir: JRXML文件目录
|
|
output_dir: 输出目录,默认为 input_dir/../chunked_output
|
|
max_chunk_size: 单个chunk最大字节数
|
|
"""
|
|
if input_dir is None:
|
|
input_dir = str(JRXML_SOURCE_DIR)
|
|
input_path = Path(input_dir).resolve()
|
|
|
|
if not input_path.exists():
|
|
print(f"❌ 目录不存在: {input_path}")
|
|
return None
|
|
|
|
if not input_path.is_dir():
|
|
print(f"❌ 不是目录: {input_path}")
|
|
return None
|
|
|
|
if output_dir is None:
|
|
output_dir = str(CHUNKER_OUTPUT_DIR)
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
if max_chunk_size is None:
|
|
max_chunk_size = MAX_CHUNK_SIZE
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f"JRXML 语义分块 v3.0 - 批量处理")
|
|
print(f"{'='*60}")
|
|
print(f"输入目录: {input_path}")
|
|
print(f"输出目录: {output_path}")
|
|
print(f"{'='*60}\n")
|
|
|
|
# 初始化
|
|
chunker = JRXMLSemanticChunker(max_chunk_size=max_chunk_size)
|
|
|
|
# 收集所有JRXML文件
|
|
jrxml_files = list(input_path.rglob("*.jrxml")) + list(input_path.rglob("*.JRXML"))
|
|
total_files = len(jrxml_files)
|
|
print(f"找到 {total_files} 个JRXML文件\n")
|
|
|
|
if total_files == 0:
|
|
print("⚠️ 未找到JRXML文件")
|
|
return None
|
|
|
|
# 统计变量
|
|
all_chunks = []
|
|
stats = {
|
|
"total_files": total_files,
|
|
"success": 0,
|
|
"failed": 0,
|
|
"total_chunks": 0,
|
|
"failed_files": [],
|
|
"chunks_per_file": defaultdict(int),
|
|
"chunk_types": defaultdict(int),
|
|
"started_at": datetime.now().isoformat()
|
|
}
|
|
|
|
start_time = time.time()
|
|
|
|
# 逐个处理文件
|
|
for i, jrxml_file in enumerate(jrxml_files, 1):
|
|
relative_path = jrxml_file.relative_to(input_path)
|
|
|
|
try:
|
|
file_start = time.time()
|
|
chunks = chunker.chunk_file(str(jrxml_file))
|
|
file_duration = time.time() - file_start
|
|
|
|
all_chunks.extend(chunks)
|
|
|
|
# 统计
|
|
stats["success"] += 1
|
|
stats["total_chunks"] += len(chunks)
|
|
stats["chunks_per_file"][str(relative_path)] = len(chunks)
|
|
|
|
for chunk in chunks:
|
|
stats["chunk_types"][chunk["chunk_type"]] += 1
|
|
|
|
print(f"[{i}/{total_files}] ✅ {relative_path} → {len(chunks)} chunks ({file_duration:.2f}s)")
|
|
|
|
except Exception as e:
|
|
stats["failed"] += 1
|
|
error_info = {"file": str(relative_path), "error": str(e)}
|
|
stats["failed_files"].append(error_info)
|
|
print(f"[{i}/{total_files}] ❌ {relative_path} → {e}")
|
|
|
|
total_duration = time.time() - start_time
|
|
stats["processing_time"] = round(total_duration, 2)
|
|
stats["finished_at"] = datetime.now().isoformat()
|
|
|
|
# 保存所有chunks
|
|
all_chunks_path = output_path / "all_chunks.json"
|
|
save_chunks_to_json(all_chunks, str(all_chunks_path))
|
|
|
|
# 保存统计报告
|
|
stats_path = output_path / "processing_stats.json"
|
|
with open(stats_path, "w", encoding="utf-8") as f:
|
|
json.dump(stats, f, ensure_ascii=False, indent=2)
|
|
|
|
# 按文件保存独立chunks
|
|
per_file_dir = output_path / "per_file"
|
|
per_file_dir.mkdir(exist_ok=True)
|
|
|
|
chunks_by_file = defaultdict(list)
|
|
for chunk in all_chunks:
|
|
# 从context中提取文件名
|
|
context = chunk.get("context", "")
|
|
chunks_by_file[context].append(chunk)
|
|
|
|
for context, file_chunks in chunks_by_file.items():
|
|
# 简化文件名
|
|
safe_name = context.replace("'", "").replace(" ", "_").replace("Report_", "")[:100]
|
|
file_path = per_file_dir / f"{safe_name}.json"
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
json.dump(file_chunks, f, ensure_ascii=False, indent=2)
|
|
|
|
# 打印总结
|
|
print(f"\n{'='*60}")
|
|
print(f"处理完成!")
|
|
print(f"{'='*60}")
|
|
print(f"✅ 成功: {stats['success']} 文件")
|
|
print(f"❌ 失败: {stats['failed']} 文件")
|
|
print(f"📦 总Chunks: {stats['total_chunks']}")
|
|
print(f"⏱️ 总耗时: {total_duration:.2f}s")
|
|
print(f"📂 输出目录: {output_path}")
|
|
print(f"\n主要文件:")
|
|
print(f" - {all_chunks_path}")
|
|
print(f" - {stats_path}")
|
|
print(f" - {per_file_dir}/ (按文件分类的chunks)")
|
|
|
|
print(f"\nChunk类型分布:")
|
|
print_chunk_summary(all_chunks)
|
|
|
|
if stats["failed_files"]:
|
|
print(f"\n⚠️ 失败文件详情:")
|
|
for fail in stats["failed_files"]:
|
|
print(f" - {fail['file']}: {fail['error']}")
|
|
|
|
return {
|
|
"chunks": all_chunks,
|
|
"stats": stats,
|
|
"output_path": str(output_path)
|
|
}
|
|
|
|
|
|
def chunk_single_file_with_report(file_path: str, output_dir: str = None):
|
|
"""处理单个文件并生成详细报告"""
|
|
file_path = Path(file_path).resolve()
|
|
|
|
if not file_path.exists():
|
|
print(f"❌ 文件不存在: {file_path}")
|
|
return None
|
|
|
|
if output_dir is None:
|
|
output_dir = file_path.parent / f"{file_path.stem}_chunks"
|
|
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"\n处理文件: {file_path.name}")
|
|
print(f"输出目录: {output_path}\n")
|
|
|
|
chunker = JRXMLSemanticChunker(max_chunk_size=2000)
|
|
|
|
start_time = time.time()
|
|
chunks = chunker.chunk_file(str(file_path))
|
|
duration = time.time() - start_time
|
|
|
|
# 保存结果
|
|
chunks_path = output_path / f"{file_path.stem}_chunks.json"
|
|
save_chunks_to_json(chunks, str(chunks_path))
|
|
|
|
# 生成人类可读的摘要
|
|
summary_path = output_path / f"{file_path.stem}_summary.txt"
|
|
with open(summary_path, "w", encoding="utf-8") as f:
|
|
f.write(f"JRXML Chunking Report: {file_path.name}\n")
|
|
f.write(f"{'='*60}\n")
|
|
f.write(f"Processing time: {duration:.2f}s\n")
|
|
f.write(f"Total chunks: {len(chunks)}\n\n")
|
|
|
|
for chunk in chunks:
|
|
f.write(f"[Chunk {chunk['chunk_id']}] {chunk['chunk_type']}\n")
|
|
f.write(f" Description: {chunk['human_description'][:200]}\n")
|
|
f.write(f" XML Length: {len(chunk['raw_xml'])} chars\n")
|
|
f.write(f" Context: {chunk.get('context', 'N/A')}\n\n")
|
|
|
|
print(f"✅ 生成 {len(chunks)} chunks")
|
|
print(f"📄 Chunks JSON: {chunks_path}")
|
|
print(f"📄 可读摘要: {summary_path}")
|
|
print(f"⏱️ 耗时: {duration:.2f}s")
|
|
|
|
print_chunk_summary(chunks)
|
|
|
|
return chunks
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print("=" * 60)
|
|
print("JRXML Semantic Chunking v3.0 - 批量处理工具")
|
|
print("=" * 60)
|
|
print(f"\n默认输入目录: {JRXML_SOURCE_DIR}")
|
|
print(f"默认输出目录: {CHUNKER_OUTPUT_DIR}")
|
|
print("\n用法:")
|
|
print(" python jrxml_banch_chunker.py <目录路径>")
|
|
print(" python jrxml_banch_chunker.py <文件路径>")
|
|
print(" python jrxml_banch_chunker.py (使用默认配置)")
|
|
print("\n参数:")
|
|
print(" <路径> JRXML文件所在目录 或 单个JRXML文件路径")
|
|
print(" --output <目录> 指定输出目录 (可选)")
|
|
print("\n示例:")
|
|
print(" python jrxml_banch_chunker.py")
|
|
print(" python jrxml_banch_chunker.py ./jasper_reports")
|
|
print(" python jrxml_banch_chunker.py ./jasper_reports --output ./chunks")
|
|
print(" python jrxml_banch_chunker.py report.jrxml")
|
|
sys.exit(0)
|
|
|
|
input_path = sys.argv[1]
|
|
|
|
output_dir = None
|
|
if "--output" in sys.argv:
|
|
idx = sys.argv.index("--output")
|
|
if idx + 1 < len(sys.argv):
|
|
output_dir = sys.argv[idx + 1]
|
|
|
|
if os.path.isdir(input_path):
|
|
batch_chunk_with_report(input_path, output_dir)
|
|
elif os.path.isfile(input_path):
|
|
chunk_single_file_with_report(input_path, output_dir)
|
|
else:
|
|
print(f"❌ 路径无效: {input_path}") |