Compare commits

..

4 Commits

Author SHA1 Message Date
panda 687b3a8f90 gitignore更新 2026-05-15 11:39:14 +08:00
panda 9b913302fd docs: 更新文档,区分首次全量与增量更新两种使用场景
- README.md: 拆分为"首次使用"和"增量更新"两个独立章节
- docs/file_guide.md: 新增三种使用场景 (首次建库/追加模板/换模型)
- 补充三个 --incremental 标志的工作逻辑对比表

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 11:26:54 +08:00
panda ea44096b12 feat: batch_chunker.py 支持 --incremental 增量分块
增量模式下自动跳过已处理文件,合并新旧 chunks 和统计报告。
至此分块、向量化、导入三个步骤均支持增量处理。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 11:20:07 +08:00
panda 0787901acc feat: 添加Markdown分块器与统一批量分块入口,支持增量向量化与导入
- 新增 md_chunker.py: Markdown语义分块引擎,支持标题/代码块/表格智能拆分
- 新增 batch_chunker.py: 统一批量分块入口,支持JRXML+Markdown混合处理
- 新增 requirements.txt: 整理项目依赖
- embed_chunks.py: 新增 --incremental 增量模式,追加新向量到已有数据
- import_to_chroma.py: 新增 --incremental 增量模式,不再每次清空数据库
- 更新 README.md 与 docs/file_guide.md 反映最新架构

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 11:10:25 +08:00
8 changed files with 1113 additions and 306 deletions
+1
View File
@@ -32,6 +32,7 @@ jrxml_chunker_output/
# IDE
.idea/
.vscode/
。claude/
*.swp
*.swo
*~
+131 -81
View File
@@ -1,79 +1,94 @@
# JRXML RAG 项目
基于 RAGRetrieval-Augmented Generation的 JasperReports JRXML 模板智能问答系统,构建JRXML自定义agent的前置工作。
基于 RAG 的 JasperReports JRXML 模板 + Markdown 文档智能检索系统,作为构建 JRXML 自定义 Agent 的前置工作。
## 项目简介
本项目将 JasperReports 的 JRXML 模板文件进行语义分块、向量化,并存入 Chroma 向量数据库,实现通过自然语言查询来检索和理解报表模板的结构、配置和逻辑。
支持 JRXML 模板和 Markdown 文档的语义分块、向量化、Chroma 持久化存储,以及自然语言查询。**三个核心步骤均支持增量处理**。
## 项目结构
```
RAG-jaspersoft/
├── collect_jrxml.py # JRXML 文件收集脚本
├── jrxml_chunker.py # JRXML 语义分块核心引擎
├── jrxml_banch_chunker.py # 批量分块入口脚本
├── down_embedding_model.py # 嵌入模型下载脚本
├── embed_chunks.py # Chunk 向量化脚本
├── import_to_chroma.py # 向量导入 Chroma 数据库
├── query_chroma.py # 语义搜索查询工具
├── jrxml_source/ # JRXML 源文件目录
├── jrxml_chunker_output/ # 分块输出目录
│ ├── all_chunks.json # 所有 chunks 合并文件
│ ├── processing_stats.json # 处理统计报告
│ └── per_file/ # 按文件分类的 chunks
├── models/ # 嵌入模型存放目录
│ └── Qwen3-Embedding-4B/ # Qwen3 嵌入模型
├── embeddings/ # 向量输出目录
│ ├── embeddings.npy # 向量矩阵
│ ├── chunks.json # 原始 chunks
│ └── embeddings.pkl # 完整数据 pickle
├── chroma_db/ # Chroma 向量数据库
└── docs/ # 项目文档
└── file_guide.md # 文件功能说明
rag_jrxml/
├── collect_jrxml.py # JRXML 文件收集
├── jrxml_chunker.py # JRXML 语义分块引擎 (v3.0)
├── md_chunker.py # Markdown 语义分块引擎
├── batch_chunker.py # 统一批量分块入口 (JRXML + MD, 支持增量)
├── down_embedding_model.py # 嵌入模型下载
├── embed_chunks.py # Chunk 向量化 (支持增量)
├── import_to_chroma.py # Chroma 向量入库 (支持增量)
├── query_chroma.py # 语义搜索查询
├── config.py # 统一配置管理 (.env)
├── .env / .env.example # 环境变量配置
├── requirements.txt # Python 依赖
├── jrxml_source/ # JRXML 源文件
├── jrxml_chunker_output/ # 分块输出
├── embeddings/ # 向量输出
├── chroma_db/ # Chroma 持久化数据库
└── docs/file_guide.md # 详细文件功能说明
```
## 快速开始
### 环境要求
## 环境要求
- Python 3.11+
- NVIDIA GPU推荐8GB+ 显存或 CPU
- CUDA 12.1+GPU 模式
- NVIDIA GPU (推荐 8GB+ 显存) 或 CPU
- CUDA 12.1+ (GPU 模式)
### 安装依赖
## 安装与配置
```bash
# 安装 PyTorch (CUDA 版本)
uv pip install torch --index-url https://download.pytorch.org/whl/cu130
# 安装其他依赖
uv pip install sentence-transformers chromadb numpy tqdm
pip install -r requirements.txt
cp .env.example .env # 编辑 .env 调整模型、路径等参数
```
### 完整流程
主要配置项:
| 变量 | 说明 | 默认值 |
| --- | --- | --- |
| `EMBEDDING_MODEL_NAME` | 嵌入模型 (Hub 名) | `Qwen/Qwen3-Embedding-0.6B` |
| `EMBEDDING_MODEL_PATH` | 本地模型路径 | `models/Qwen3-Embedding-0.6B` |
| `MAX_CHUNK_SIZE` | 单个 chunk 最大字符数 | `2000` |
| `BATCH_SIZE` | 向量化批大小 | `16` |
| `CHROMA_COLLECTION_NAME` | Chroma 集合名 | `jrxml_chunks` |
---
## 首次使用 — 全量建库
从头构建向量数据库,三个步骤顺序执行:
### 步骤 1:收集 & 分块
```bash
# 1. 收集 JRXML 文件
# 收集 JRXML 模板文件
python collect_jrxml.py
# 2. 语义分块
python jrxml_banch_chunker.py ./jrxml_source --output ./jrxml_chunker_output
# 3. 下载嵌入模型(首次运行)
python down_embedding_model.py
# 4. 向量化
python embed_chunks.py --batch_size 2
# 5. 导入 Chroma 数据库
python import_to_chroma.py
# 6. 开始查询
python query_chroma.py
# 统一分块 (JRXML + Markdown 混合目录)
python batch_chunker.py ./jrxml_source --output ./jrxml_chunker_output
```
### 快速查询
输出 `jrxml_chunker_output/all_chunks.json``processing_stats.json`
### 步骤 2:向量化
```bash
# 下载嵌入模型 (仅首次)
python down_embedding_model.py
# 全量向量化
python embed_chunks.py
```
输出 `embeddings/embeddings.npy``chunks.json` 等文件。
### 步骤 3:导入 Chroma
```bash
# 全量导入 (创建新集合)
python import_to_chroma.py
```
输出 `chroma_db/` 持久化向量数据库。
### 步骤 4:查询
```bash
# 交互模式
@@ -81,51 +96,86 @@ python query_chroma.py
# 单次查询
python query_chroma.py "如何修改报表标题"
# 按类型过滤
python query_chroma.py "SQL查询怎么写" --filter_field query
python query_chroma.py "报表参数" --threshold 0.5 --n_results 10
```
---
## 增量更新 — 追加新模板
已有数据库后,添加新模板无需重建。将新文件放入源目录后:
```bash
# 步骤 1:增量分块 (自动跳过已处理文件,合并到已有结果)
python batch_chunker.py ./jrxml_source --incremental
# 步骤 2:增量向量化 (只对新 chunks 编码,合并到已有向量)
python embed_chunks.py --incremental
# 步骤 3:增量导入 (追加到已有集合,不删除现有数据)
python import_to_chroma.py --incremental
```
三个 `--incremental` 标志各自的工作逻辑:
| 步骤 | 如何识别已处理 | 无新数据时 |
| --- | --- | --- |
| `batch_chunker` | 对比 `processing_stats.json` 中的文件路径 | 输出 "没有新文件需要处理" |
| `embed_chunks` | 按 `(context, chunk_id)` 去重 | 输出 "没有新 chunks 需要向量化" |
| `import_to_chroma` | 查询 Chroma 已有 ID | 输出 "没有新数据需要导入" |
---
## 分块类型
系统将 JRXML 模板按以下语义类型进行分块:
### JRXML
| 类型 | 说明 |
|------|------|
| `report_overview` | 报告整体概览,含数据源分析 |
| `datasource_config` | 数据源配置属性 |
| `query` | 数据查询SQL/HQL/XPath 等 |
|---|---|
| `report_overview` | 报表概览 (含数据源分析) |
| `datasource_config` | 数据源配置 |
| `query` | 数据查询 (SQL/HQL/XPath/JSON) |
| `parameters` | 参数定义 |
| `fields` | 字段定义 |
| `fields` / `field` | 字段定义 |
| `sortFields` | 排序字段 |
| `filterExpression` | 过滤表达式 |
| `variables_*` | 变量定义(按重置类型分组) |
| `variables_*` | 变量定义 (按 resetType) |
| `styles` | 样式定义 |
| `groups` | 分组定义 |
| `band_*` | 标准带(title/detail/pageHeader 等) |
| `dataset` | 数据集定义 |
| `group` | 分组定义 |
| `band_*` | 标准带区 (title/detail/pageHeader 等) |
| `chart` | 图表元素 |
| `crosstab` | 交叉表元素 |
| `subreport` | 子报表元素 |
| `component` | 组件元素列表等 |
| `dataset` | 数据集定义 |
| `component` | 组件元素 (列表等) |
### Markdown
| 类型 | 说明 |
|---|---|
| `section_h1` | 一级标题段落 |
| `section_h2` / `section_h3` | 二/三级标题段落 |
| `section_installation` | 安装/部署章节 |
| `section_configuration` | 配置章节 |
| `section_api` | API 接口章节 |
| `section_example` | 示例/用法章节 |
| `section_faq` | FAQ/常见问题章节 |
| `section_changelog` | 更新日志章节 |
| `code` | 代码块 |
## 支持的 JRXML 数据源
SQL/JDBC · HQL/Hibernate · XPath/XML · JSON · JSONQL · CSV · Data Adapter (Excel/XML/HTTP) · Bean Collection · Empty
## 技术栈
- **分块引擎**: 基于 XML 解析的语义分块器
- **嵌入模型**: Qwen3-Embedding-4B支持 FP16 半精度)
- **向量数据库**: ChromaDB(持久化模式,余弦相似度)
- **分块引擎**: XML 语义解析 (JRXML) + Markdown 结构化解析
- **嵌入模型**: Qwen3-Embedding (支持 FP16, 可替换)
- **嵌入框架**: Sentence-Transformers
- **深度学习**: PyTorch + CUDA
## 性能参考
| 硬件 | 模型 | Batch Size | 速度 |
|------|------|-----------|------|
| RTX 4060 Laptop 8GB | Qwen3-Embedding-4B (FP16) | 2 | ~1.2s/chunk |
| RTX 4060 Laptop 8GB | all-MiniLM-L6-v2 | 64 | ~0.001s/chunk |
> 离线建库是一次性开销,在线查询仅需 1-2 秒。
- **向量数据库**: ChromaDB (持久化, 余弦相似度)
- **深度学习**: PyTorch + CUDA (CPU 兼容)
## License
MIT
MIT
+296
View File
@@ -0,0 +1,296 @@
"""
batch_chunker.py
统一批量分块入口,支持 JRXML 和 Markdown 文件混合处理
"""
import os
import sys
import json
import time
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from jrxml_chunker import JRXMLSemanticChunker
from md_chunker import MarkdownSemanticChunker, save_chunks_to_json
SUPPORTED_EXTENSIONS = ('.jrxml', '.JRXML', '.md', '.markdown')
def batch_chunk_with_report(input_dir: str = None, output_dir: str = None,
max_chunk_size: int = 2000, incremental: bool = False):
"""
批量分块,支持 JRXML 和 Markdown 混合处理
Args:
input_dir: 输入目录
output_dir: 输出目录
max_chunk_size: 单个 chunk 最大字符数
incremental: 增量模式,只处理新增文件,合并到已有结果
"""
if input_dir is None:
print("错误:请指定输入目录")
return None
input_path = Path(input_dir).resolve()
if not input_path.exists():
print(f"❌ 目录不存在: {input_path}")
return None
if not input_path.is_dir():
print(f"❌ 不是目录: {input_path}")
return None
if output_dir is None:
output_dir = input_path.parent / f"{input_path.stem}_chunks"
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
print(f"\n{'='*60}")
print(f"统一批量分块 v1.0" + (" [增量模式]" if incremental else ""))
print(f"{'='*60}")
print(f"输入目录: {input_path}")
print(f"输出目录: {output_path}")
print(f"{'='*60}\n")
# 增量模式:加载已有数据,跳过已处理的文件
existing_chunks = []
processed_files = set()
if incremental:
existing_chunks_path = output_path / "all_chunks.json"
existing_stats_path = output_path / "processing_stats.json"
if existing_chunks_path.exists() and existing_stats_path.exists():
with open(existing_chunks_path, 'r', encoding='utf-8') as f:
existing_chunks = json.load(f)
with open(existing_stats_path, 'r', encoding='utf-8') as f:
existing_stats = json.load(f)
processed_files = set(existing_stats.get("chunks_per_file", {}).keys())
print(f"增量模式: 已有 {len(existing_chunks)} 个 chunks, {len(processed_files)} 个已处理文件")
else:
print(f"增量模式: 未找到已有数据,切换为全量处理")
incremental = False
# 初始化分块器
jrxml_chunker = JRXMLSemanticChunker(max_chunk_size=max_chunk_size)
md_chunker = MarkdownSemanticChunker(max_chunk_size=max_chunk_size)
# 收集所有支持的文件
files_by_ext = defaultdict(list)
for ext in SUPPORTED_EXTENSIONS:
files_by_ext[ext] = list(input_path.rglob(f"*{ext}"))
# 增量模式:过滤已处理文件
total_found = sum(len(f) for f in files_by_ext.values())
if incremental and processed_files:
skipped = 0
for ext in SUPPORTED_EXTENSIONS:
new_list = []
for f in files_by_ext[ext]:
if str(f.relative_to(input_path)) in processed_files:
skipped += 1
else:
new_list.append(f)
files_by_ext[ext] = new_list
print(f"扫描到 {total_found} 个文件, 跳过 {skipped} 个已处理")
else:
print(f"扫描到 {total_found} 个文件")
total_files = sum(len(f) for f in files_by_ext.values())
for ext, files in files_by_ext.items():
if files:
print(f" {ext}: {len(files)}")
if total_files == 0:
print("✅ 没有新文件需要处理")
result_stats = existing_stats.copy() if (incremental and processed_files) else {}
return {
"chunks": existing_chunks,
"stats": result_stats,
"output_path": str(output_path)
}
# 统计变量
all_chunks = []
stats = {
"total_files": total_found,
"success": 0,
"failed": 0,
"total_chunks": 0,
"failed_files": [],
"chunks_per_file": defaultdict(int),
"chunk_types": defaultdict(int),
"files_by_type": {"jrxml": 0, "markdown": 0},
"started_at": datetime.now().isoformat()
}
start_time = time.time()
# 处理 JRXML 文件
jrxml_files = files_by_ext.get('.jrxml', []) + files_by_ext.get('.JRXML', [])
if jrxml_files:
print(f"\n📄 处理 JRXML 文件 ({len(jrxml_files)} 个)...")
for i, jrxml_file in enumerate(jrxml_files, 1):
relative_path = jrxml_file.relative_to(input_path)
try:
file_start = time.time()
chunks = jrxml_chunker.chunk_file(str(jrxml_file))
file_duration = time.time() - file_start
all_chunks.extend(chunks)
stats["success"] += 1
stats["files_by_type"]["jrxml"] += 1
stats["total_chunks"] += len(chunks)
stats["chunks_per_file"][str(relative_path)] = len(chunks)
for chunk in chunks:
stats["chunk_types"][f"jrxml_{chunk['chunk_type']}"] += 1
print(f"[{i}/{len(jrxml_files)}] ✅ JRXML: {relative_path}{len(chunks)} chunks ({file_duration:.2f}s)")
except Exception as e:
stats["failed"] += 1
error_info = {"file": str(relative_path), "type": "jrxml", "error": str(e)}
stats["failed_files"].append(error_info)
print(f"[{i}/{len(jrxml_files)}] ❌ JRXML: {relative_path}{e}")
# 处理 Markdown 文件
md_files = files_by_ext.get('.md', []) + files_by_ext.get('.markdown', [])
if md_files:
print(f"\n📝 处理 Markdown 文件 ({len(md_files)} 个)...")
for i, md_file in enumerate(md_files, 1):
relative_path = md_file.relative_to(input_path)
try:
file_start = time.time()
chunks = md_chunker.chunk_file(str(md_file))
file_duration = time.time() - file_start
all_chunks.extend(chunks)
stats["success"] += 1
stats["files_by_type"]["markdown"] += 1
stats["total_chunks"] += len(chunks)
stats["chunks_per_file"][str(relative_path)] = len(chunks)
for chunk in chunks:
stats["chunk_types"][f"md_{chunk['chunk_type']}"] += 1
print(f"[{i}/{len(md_files)}] ✅ MD: {relative_path}{len(chunks)} chunks ({file_duration:.2f}s)")
except Exception as e:
stats["failed"] += 1
error_info = {"file": str(relative_path), "type": "markdown", "error": str(e)}
stats["failed_files"].append(error_info)
print(f"[{i}/{len(md_files)}] ❌ MD: {relative_path}{e}")
total_duration = time.time() - start_time
stats["processing_time"] = round(total_duration, 2)
stats["finished_at"] = datetime.now().isoformat()
# 增量模式:合并新旧数据
if incremental and existing_chunks:
merged_chunks = existing_chunks + all_chunks
print(f"\n合并: 已有 {len(existing_chunks)} + 新增 {len(all_chunks)} = {len(merged_chunks)} 个 chunks")
all_chunks = merged_chunks
# 合并统计
merged_stats = existing_stats.copy()
merged_stats["success"] = existing_stats.get("success", 0) + stats["success"]
merged_stats["failed"] = existing_stats.get("failed", 0) + stats["failed"]
merged_stats["total_chunks"] = existing_stats.get("total_chunks", 0) + stats["total_chunks"]
merged_stats["processing_time"] = round(existing_stats.get("processing_time", 0) + total_duration, 2)
merged_stats["finished_at"] = stats["finished_at"]
for fp, count in stats["chunks_per_file"].items():
merged_stats["chunks_per_file"][fp] = count
for ct, count in stats["chunk_types"].items():
merged_stats["chunk_types"][ct] = merged_stats.get("chunk_types", {}).get(ct, 0) + count
merged_stats["files_by_type"]["jrxml"] = existing_stats.get("files_by_type", {}).get("jrxml", 0) + stats["files_by_type"]["jrxml"]
merged_stats["files_by_type"]["markdown"] = existing_stats.get("files_by_type", {}).get("markdown", 0) + stats["files_by_type"]["markdown"]
if stats["failed_files"]:
merged_stats.setdefault("failed_files", []).extend(stats["failed_files"])
stats_serializable = {k: (dict(v) if isinstance(v, defaultdict) else v) for k, v in merged_stats.items()}
else:
stats_serializable = {k: (dict(v) if isinstance(v, defaultdict) else v) for k, v in stats.items()}
# 保存所有 chunks
all_chunks_path = output_path / "all_chunks.json"
save_chunks_to_json(all_chunks, str(all_chunks_path))
# 保存统计报告
stats_path = output_path / "processing_stats.json"
with open(stats_path, "w", encoding="utf-8") as f:
json.dump(stats_serializable, f, ensure_ascii=False, indent=2)
# 打印总结
total_success = stats_serializable.get("success", stats["success"])
total_failed = stats_serializable.get("failed", stats["failed"])
total_chunks_count = stats_serializable.get("total_chunks", stats["total_chunks"])
jrxml_count = stats_serializable.get("files_by_type", {}).get("jrxml", stats["files_by_type"]["jrxml"])
md_count = stats_serializable.get("files_by_type", {}).get("markdown", stats["files_by_type"]["markdown"])
print(f"\n{'='*60}")
print(f"处理完成!")
print(f"{'='*60}")
print(f"✅ 成功: {total_success} 文件 (JRXML: {jrxml_count}, MD: {md_count})")
print(f"❌ 失败: {total_failed} 文件")
print(f"📦 总 Chunks: {total_chunks_count}")
print(f"⏱️ 总耗时: {total_duration:.2f}s")
print(f"📂 输出目录: {output_path}")
print(f"\n主要文件:")
print(f" - {all_chunks_path}")
print(f" - {stats_path}")
display_types = stats_serializable.get("chunk_types", stats.get("chunk_types", {}))
if display_types:
print(f"\nChunk 类型分布 (前 10):")
sorted_types = sorted(display_types.items(), key=lambda x: -x[1])[:10]
for ct, count in sorted_types:
print(f" {ct}: {count}")
if stats["failed_files"]:
print(f"\n⚠️ 失败文件详情:")
for fail in stats["failed_files"][:10]:
print(f" - {fail['file']} ({fail['type']}): {fail['error']}")
return {
"chunks": all_chunks,
"stats": stats_serializable,
"output_path": str(output_path)
}
if __name__ == "__main__":
if len(sys.argv) < 2:
print("=" * 60)
print("统一批量分块 v1.0")
print("支持 JRXML 和 Markdown 文件")
print("=" * 60)
print("\n用法:")
print(" python batch_chunker.py <目录路径>")
print(" python batch_chunker.py <目录路径> --output <输出目录>")
print(" python batch_chunker.py <目录路径> --incremental")
print("\n示例:")
print(" python batch_chunker.py ./jrxml_source")
print(" python batch_chunker.py ./docs")
print(" python batch_chunker.py ./ --output ./chunks")
print(" python batch_chunker.py ./jrxml_source --incremental # 增量分块")
sys.exit(0)
input_path = sys.argv[1]
output_dir = None
if "--output" in sys.argv:
idx = sys.argv.index("--output")
if idx + 1 < len(sys.argv):
output_dir = sys.argv[idx + 1]
incremental = "--incremental" in sys.argv
if os.path.isdir(input_path):
batch_chunk_with_report(input_path, output_dir, incremental=incremental)
else:
print(f"❌ 路径无效或不是目录: {input_path}")
+194 -190
View File
@@ -4,260 +4,262 @@
---
## 1. collect_jrxml.py — JRXML 文件收集脚本
## 1. collect_jrxml.py — JRXML 文件收集
**功能**: 从指定的 JasperReports 模板库目录递归收集所有 `.jrxml` 文件,复制到项目 `jrxml_source` 目录。
**输入**:
- 源目录: `C:\Users\zy187\JaspersoftWorkspace\JasperReportsSamples`(可修改)
**输出**:
- `jrxml_source/` 目录,包含所有收集到的 JRXML 文件
**功能**: 从 JasperReports 模板库目录递归收集 `.jrxml` 文件,复制到项目 `jrxml_source` 目录。
**使用方式**:
```bash
python collect_jrxml.py
```
**核心逻辑**:
- 使用 `os.walk()` 递归遍历源目录
- 筛选 `.jrxml` 后缀文件
- 自动处理文件名冲突(添加数字后缀)
- 使用 `shutil.copy2()` 保留文件元数据
---
## 2. jrxml_chunker.py — JRXML 语义分块引擎 (v3.0)
**功能**: 将单个 JRXML 文件按语义结构拆分。被 `batch_chunker.py` 调用,也可单独使用。
**输入**: 单个 `.jrxml` 文件路径(或目录)
**输出**: `JRXMLChunk` 列表,字段包括:
- `chunk_id`: 文件内序号
- `chunk_type`: 分块类型 (`query`, `band_detail`, `chart` 等)
- `human_description`: 人类可读描述
- `raw_xml`: 原始 XML 片段
- `context`: 所属报表名称
- `metadata`: 元数据 (report_name, band_name, element_kind 等)
**单独使用**:
```bash
python jrxml_chunker.py report.jrxml # 单文件
python jrxml_chunker.py ./jrxml_source/ # 目录
```
---
## 2. jrxml_chunker.py — JRXML 语义分块核心引擎
## 3. md_chunker.py — Markdown 语义分块引擎
**功能**: 将单个 JRXML 文件按语义结构拆分为多个 chunk,每个 chunk 包含人类可读描述、原始 XML 和元数据
**输入**:
- 单个 JRXML 文件路径
**输出**:
- `JRXMLChunk` 对象列表,每个包含:
- `chunk_id`: 唯一标识
- `chunk_type`: 分块类型(如 `query`, `field`, `band_title` 等)
- `human_description`: 人类可读的结构化描述
- `raw_xml`: 原始 XML 片段
- `context`: 上下文信息(所属报表名称)
- `metadata`: 元数据字典
**核心类**:
- `JRXMLChunk`: 单个 chunk 的数据结构
- `JRXMLSemanticChunker`: 主分块器,支持多种数据源类型(SQL、HQL、XPath、JSON、CSV 等)
**功能**: 将 Markdown 文件按标题层级、代码块、表格等结构化元素智能分块。被 `batch_chunker.py` 调用,也可单独使用
**分块策略**:
- XML 元素类型分类(field、parameter、variable、band、chart 等)
- 提取数据源配置和查询语句
- 保留元素间的层级关系
- 为每个 chunk 生成结构化的人类可读描述
-标题层级 (H1/H2/H3) 划分段落,H2 自动识别特殊类型
- 代码块、表格作为独立 chunk
- 过长段落按段落/句子二次拆分
**使用方式**:
**单独使用**:
```bash
# 处理单个文件
python jrxml_chunker.py report.jrxml
# 处理整个目录
python jrxml_chunker.py ./jrxml_source/
python md_chunker.py doc.md # 单文件
python md_chunker.py ./docs/ # 目录
```
---
## 3. jrxml_banch_chunker.py — 批量分块入口脚本
## 4. batch_chunker.py — 统一批量分块入口
**功能**: 批量处理目录下所有 JRXML 文件,生成统计报告和分类输出
**输入**:
- JRXML 文件目录(默认: `jrxml_source`
**功能**: 统一入口,支持 JRXML + Markdown 混合批量处理。**支持增量模式**
**输出**:
- `jrxml_chunker_output/all_chunks.json`: 所有 chunks 合并文件
- `jrxml_chunker_output/processing_stats.json`: 处理统计(成功/失败数、耗时、chunk 类型分布
- `jrxml_chunker_output/per_file/`: 按原文件分类的独立 chunk 文件
- `all_chunks.json`: 所有 chunks 合并
- `processing_stats.json`: 处理统计 (文件级 chunk 数量、类型分布)
**核心函数**:
- `batch_chunk_with_report()`: 批量处理目录
- `chunk_single_file_with_report()`: 处理单个文件
**使用方式**:
**全量模式** — 首次建库:
```bash
# 使用默认输入目录
python jrxml_banch_chunker.py
# 指定输入目录
python jrxml_banch_chunker.py ./jrxml_source
# 指定输出目录
python jrxml_banch_chunker.py ./jrxml_source --output ./my_output
python batch_chunker.py ./jrxml_source --output ./jrxml_chunker_output
```
**增量模式** (`--incremental`) — 追加新文件:
```bash
python batch_chunker.py ./jrxml_source --incremental
```
增量模式逻辑:
1. 加载已有 `processing_stats.json`,获取已处理文件列表
2. 扫描输入目录,自动跳过已处理文件
3. 只分块新增文件
4. 合并新旧 `all_chunks.json` 和统计数据后保存
---
## 4. down_embedding_model.py — 嵌入模型下载脚本
## 5. down_embedding_model.py — 嵌入模型下载
**功能**: 从 HuggingFace Hub 下载 Qwen3-Embedding-4B 嵌入模型到本地。
**功能**: 从 HuggingFace Hub 下载嵌入模型到本地。支持国内镜像 (`hf-mirror.com`)、断点续传
**输入**:
- HuggingFace 模型仓库: `Qwen/Qwen3-Embedding-4B`
**输出**:
- `models/Qwen3-Embedding-4B/` 目录,包含完整的模型文件
**特性**:
- 使用国内镜像加速下载(`hf-mirror.com`
- 支持断点续传
- 自动安装依赖
**使用方式**:
```bash
python down_embedding_model.py
```
---
## 5. embed_chunks.py — Chunk 向量化脚本
## 6. embed_chunks.py — Chunk 向量化
**功能**: 使用嵌入模型将分块后的文本转换为向量表示,支持 GPU 加速和 FP16 半精度。
**输入**:
- `jrxml_chunker_output/all_chunks.json`(默认)
**功能**: 将 chunks 转换为向量支持 GPU/CPU、FP16 半精度**支持增量模式**
**输出**:
- `embeddings/embeddings.npy`: 向量矩阵float32
- `embeddings/chunk_id_map.json`: chunk ID 映射
- `embeddings/chunk_type_map.json`: chunk 类型映射
- `embeddings/chunks.json`: 原始 chunks 副本
- `embeddings/embeddings.pkl`: 完整数据 pickle
- `embeddings/embeddings.npy`: 向量矩阵 (float32)
- `embeddings/chunks.json`: 原始 chunks
- `embeddings/chunk_id_map.json` / `chunk_type_map.json`
- `embeddings/embeddings.pkl`: 完整 pickle
**核心函数**:
- `build_text_for_embedding()`: 将 chunk 转换为适合向量化的文本(拼接类型、描述、XML、元数据)
- `main()`: 主流程(加载→编码→保存→质量检查)
**特性**:
- 自动检测 CUDA/CPU
- 默认启用 FP16 半精度(节省约 50% 显存)
- 支持 HuggingFace Hub 在线模型
- 向量归一化 + NaN 检测
**使用方式**:
**全量模式** — 首次向量化:
```bash
# 使用默认设置
python embed_chunks.py
# 指定模型和 batch size
python embed_chunks.py --model_path "sentence-transformers/all-MiniLM-L6-v2" --batch_size 64
# 使用本地 Qwen3 模型
python embed_chunks.py --batch_size 2
# 禁用 FP16
python embed_chunks.py --no_fp16 --batch_size 1
python embed_chunks.py --batch_size 2 # 调整批大小
python embed_chunks.py --model_path "all-MiniLM-L6-v2" # 换模型
python embed_chunks.py --no_fp16 # 禁用半精度
```
---
## 6. import_to_chroma.py — 向量导入 Chroma 数据库
**功能**: 将已生成的向量和 chunks 导入 Chroma 持久化向量数据库。
**输入**:
- `embeddings/embeddings.npy`: 向量矩阵
- `embeddings/chunks.json`: chunks 数据
**输出**:
- `chroma_db/`: Chroma 持久化数据库目录
- 集合名称: `jrxml_chunks`(默认)
**核心逻辑**:
- 加载向量和 chunks
- 初始化 Chroma PersistentClient
- 创建集合(余弦相似度)
- 分批导入(每批 1000 条)
- 提取元数据(chunk_type、report_name、band_name 等)
- 快速验证查询
**使用方式**:
**增量模式** (`--incremental` / `-i`) — 只编码新 chunks:
```bash
# 使用默认设置
python import_to_chroma.py
# 指定路径
python import_to_chroma.py --embeddings_dir ./embeddings --chroma_path ./chroma_db
python embed_chunks.py --incremental
```
增量模式逻辑:
1. 加载已有 `embeddings.npy` + `chunks.json`
2.`(context, chunk_id)` 去重
3. 只向量化新 chunks
4. 合并新旧数据后保存
---
## 7. query_chroma.py — 语义搜索查询工具
## 7. import_to_chroma.py — Chroma 向量入库
**功能**: 通过自然语言查询 Chroma 数据库,检索相关的 JRXML chunk
**功能**: 将向量数据导入 Chroma 持久化数据库。**支持增量模式**
**输入**:
- 用户自然语言查询
- 可选的元数据过滤条件
**全量模式** — 首次导入 (删除旧集合重建):
```bash
python import_to_chroma.py
```
**输出**:
- 相似度排序的检索结果(含 chunk 类型、报表名称、区域、内容摘要)
**增量模式** (`--incremental` / `-i`) — 追加新记录:
```bash
python import_to_chroma.py --incremental
```
**核心类**:
- `JRXMLSearcher`: 搜索器,封装模型加载、向量编码和 Chroma 查询
增量模式逻辑:
1. `get_or_create_collection` (不删除已有数据)
2. 查询 Chroma 已有 ID
3. 跳过已导入的记录,只追加新数据
**核心方法**:
- `search()`: 基础语义搜索
- `search_with_threshold()`: 带相似度阈值的搜索
- `format_result()`: 格式化输出结果
---
## 8. query_chroma.py — 语义搜索查询
**功能**: 通过自然语言查询 Chroma 数据库。
**两种模式**:
1. **命令行单次查询**: `python query_chroma.py "查询内容"`
2. **交互模式**: `python query_chroma.py`(支持连续查询和内联命令)
- **单次查询**: `python query_chroma.py "查询内容"`
- **交互模式**: `python query_chroma.py`(支持连续查询和内联命令)
**交互模式命令**:
```
filter:<类型> 按 chunk_type 过滤如 filter:query
t:<阈值> 设置相似度阈值 0~1如 t:0.5
k:<数量> 设置返回结果数如 k:10
filter:<类型> 按 chunk_type 过滤 (如 filter:query)
t:<阈值> 相似度阈值 0~1 (如 t:0.5)
k:<数量> 返回结果数 (如 k:10)
```
**使用方式**:
**使用示例**:
```bash
# 交互模式
python query_chroma.py
# 单次查询
python query_chroma.py "如何修改报表标题"
# 按类型过滤
python query_chroma.py # 交互模式
python query_chroma.py "如何修改报表标题" # 单次查询
python query_chroma.py "SQL怎么写" --filter_field query
# 设置阈值和返回数量
python query_chroma.py "报表参数" --threshold 0.5 --n_results 10
python query_chroma.py "参数" --threshold 0.5 --n_results 10
```
---
## 数据流全景
## 9. config.py — 统一配置管理
**功能**: 从 `.env` 加载所有配置,所有脚本通过此模块获取配置项。
```bash
python config.py # 打印当前配置
```
---
## 10. jrxml_banch_chunker.py — 旧版入口 (已废弃)
**功能**: JRXML 单类型批量分块。已被 `batch_chunker.py` 取代,保留以兼容旧流程。
---
## 使用场景
### 场景 A:首次构建数据库
```bash
# 1. 准备源文件
python collect_jrxml.py
# 将 Markdown 文档放入 jrxml_source/ 或指定目录
# 2. 全量分块
python batch_chunker.py ./jrxml_source
# 3. 下载模型 + 全量向量化
python down_embedding_model.py
python embed_chunks.py
# 4. 全量导入
python import_to_chroma.py
# 5. 开始查询
python query_chroma.py
```
### 场景 B:追加新模板/文档
```bash
# 将新 .jrxml / .md 文件放入源目录后:
# 1. 增量分块 — 自动跳过已处理文件
python batch_chunker.py ./jrxml_source --incremental
# 2. 增量向量化 — 只编码新 chunks
python embed_chunks.py --incremental
# 3. 增量导入 — 追加到已有数据库
python import_to_chroma.py --incremental
```
### 场景 C:更换嵌入模型
```bash
# 1. 编辑 .env 修改 EMBEDDING_MODEL_NAME / EMBEDDING_MODEL_PATH
# 2. 下载新模型
python down_embedding_model.py
# 3. 重新向量化 (全量)
python embed_chunks.py
# 4. 重建数据库
python import_to_chroma.py
```
---
## 数据流
```
┌─────────────────┐
│ JasperReports │ C:\Users\...\JasperReportsSamples
模板库
└────────────────┘
│ collect_jrxml.py
┌─────────────────┐
│ jrxml_source/ 收集的 JRXML 文件
└────────────────┘
jrxml_banch_chunker.py (调用 jrxml_chunker.py)
┌─────────────────────
│ JRXML 模板 (.jrxml) │
Markdown 文档 (.md)
└──────────┬──────────┘
│ collect_jrxml.py / 手动放置
┌─────────────────────
│ jrxml_source/
└──────────┬──────────┘
│ batch_chunker.py [--incremental]
┌──────────────────────┐
│ jrxml_chunker_output/│ all_chunks.json + per_file/
└─────────────────────┘
│ embed_chunks.py (使用 Qwen3-Embedding-4B)
│ jrxml_chunker_output/│ all_chunks.json
└─────────────────────┘
│ embed_chunks.py [--incremental]
┌─────────────────┐
│ embeddings/ │ embeddings.npy + chunks.json
└────────┬────────┘
│ import_to_chroma.py
│ import_to_chroma.py [--incremental]
┌─────────────────┐
│ chroma_db/ │ Chroma 向量数据库
@@ -265,18 +267,20 @@ python query_chroma.py "报表参数" --threshold 0.5 --n_results 10
│ query_chroma.py
┌─────────────────┐
用户查询 │ 自然语言 → 相关 JRXML chunks
自然语言查询 │ 返回相关 chunks
└─────────────────┘
```
## 依赖关系
```
query_chroma.py ──────► chromadb, sentence_transformers, torch
import_to_chroma.py ──► chromadb, numpy
embed_chunks.py ──────► sentence_transformers, torch, numpy
down_embedding_model.py ► huggingface_hub
jrxml_banch_chunker.py ─► jrxml_chunker.py
jrxml_chunker.py ─────► xml.etree.ElementTree (标准库)
collect_jrxml.py ─────► 标准库 (os, shutil)
```
query_chroma.py ──────────► chromadb, sentence_transformers, torch
import_to_chroma.py ──────► chromadb, numpy
embed_chunks.py ──────────► sentence_transformers, torch, numpy
down_embedding_model.py ──► huggingface_hub
batch_chunker.py ─────────► jrxml_chunker.py, md_chunker.py
md_chunker.py ────────────► 标准库 (re, json, pathlib)
jrxml_chunker.py ─────────► xml.etree.ElementTree (标准库)
config.py ────────────────► 标准库 (os, pathlib)
collect_jrxml.py ─────────► 标准库 (os, shutil)
```
+64 -18
View File
@@ -20,7 +20,8 @@ from config import (
def build_text_for_embedding(chunk: dict) -> str:
"""
将单个 chunk 转换为适合向量化的文本
拼接:类型、描述、上下文、关键元数据、部分 XML
拼接:类型、描述、上下文、关键元数据、部分内容
支持 JRXML chunks (raw_xml) 和 Markdown chunks (raw_content)
"""
parts = [
f"[ChunkType: {chunk.get('chunk_type', 'unknown')}]",
@@ -30,9 +31,10 @@ def build_text_for_embedding(chunk: dict) -> str:
if context:
parts.append(f"Context: {context}")
raw_xml = chunk.get('raw_xml', '')
if raw_xml:
parts.append(f"XML: {raw_xml[:500]}")
# 支持两种格式:raw_xml (JRXML) 和 raw_content (Markdown)
raw_content = chunk.get('raw_xml', '') or chunk.get('raw_content', '')
if raw_content:
parts.append(f"Content: {raw_content[:500]}")
meta = chunk.get('metadata', {})
if meta:
@@ -48,12 +50,16 @@ def build_text_for_embedding(chunk: dict) -> str:
parts.append(f"Element: {meta['element_kind']}")
if 'query_language' in meta:
parts.append(f"QueryLang: {meta['query_language']}")
if 'language' in meta:
parts.append(f"CodeLang: {meta['language']}")
if 'heading' in meta:
parts.append(f"Section: {meta['heading']}")
return "\n".join(parts)
def main(chunks_json_path: str = None, output_dir: str = None,
model_path: str = None, batch_size: int = None, normalize: bool = True,
use_fp16: bool = None):
use_fp16: bool = None, incremental: bool = False):
"""
主流程:
1. 加载 chunk JSON
@@ -86,7 +92,7 @@ def main(chunks_json_path: str = None, output_dir: str = None,
if not chunks_json_path.exists():
print(f"❌ Chunks 文件不存在: {chunks_json_path}")
print(f" 请先运行 jrxml_banch_chunker.py 生成 chunks")
print(f" 请先运行 batch_chunker.py 生成 chunks")
return None
print(f"\n{'='*60}")
@@ -127,6 +133,33 @@ def main(chunks_json_path: str = None, output_dir: str = None,
print(f" GPU: {torch.cuda.get_device_name(0)}")
print(f" GPU memory: {torch.cuda.memory_allocated(0)/1024**3:.2f} GB / {torch.cuda.get_device_properties(0).total_memory/1024**3:.2f} GB")
# 增量模式:加载已有向量,只处理新 chunks
existing_chunks = []
existing_embeddings = None
if incremental:
existing_chunks_path = output_dir / "chunks.json"
existing_emb_path = output_dir / "embeddings.npy"
if existing_chunks_path.exists() and existing_emb_path.exists():
with open(existing_chunks_path, 'r', encoding='utf-8') as f:
existing_chunks = json.load(f)
existing_embeddings = np.load(existing_emb_path)
existing_keys = {(c.get('context', ''), c.get('chunk_id', -1)) for c in existing_chunks}
new_chunks = [c for c in chunks if (c.get('context', ''), c.get('chunk_id', -1)) not in existing_keys]
skipped = len(chunks) - len(new_chunks)
print(f"\n🔄 增量模式: 已有 {len(existing_chunks)} 个 chunks, 跳过 {skipped} 个重复, 新增 {len(new_chunks)}")
chunks = new_chunks
else:
print(f"\n🔄 增量模式: 未找到已有向量数据,切换为全量处理")
incremental = False
if not chunks:
print("✅ 没有新 chunks 需要向量化")
return {
"chunks": len(existing_chunks),
"embedding_dim": existing_embeddings.shape[1] if existing_embeddings is not None else 0,
"output_dir": str(output_dir)
}
print(f"\n🛠️ 构建文本表示...")
texts = []
chunk_ids = []
@@ -147,42 +180,52 @@ def main(chunks_json_path: str = None, output_dir: str = None,
)
print(f" Embeddings shape: {embeddings.shape}")
# 合并已有向量
if existing_embeddings is not None and len(existing_chunks) > 0:
all_embeddings = np.concatenate([existing_embeddings, embeddings], axis=0)
all_chunks = existing_chunks + chunks
else:
all_embeddings = embeddings
all_chunks = chunks
output_dir.mkdir(parents=True, exist_ok=True)
np.save(output_dir / "embeddings.npy", embeddings.astype('float32'))
np.save(output_dir / "embeddings.npy", all_embeddings.astype('float32'))
all_chunk_ids = [c.get('chunk_id', -1) for c in all_chunks]
all_chunk_types = [c.get('chunk_type', 'unknown') for c in all_chunks]
with open(output_dir / "chunk_id_map.json", 'w', encoding='utf-8') as f:
json.dump(chunk_ids, f, ensure_ascii=False, indent=2)
json.dump(all_chunk_ids, f, ensure_ascii=False, indent=2)
with open(output_dir / "chunk_type_map.json", 'w', encoding='utf-8') as f:
json.dump(chunk_types, f, ensure_ascii=False, indent=2)
json.dump(all_chunk_types, f, ensure_ascii=False, indent=2)
with open(output_dir / "chunks.json", 'w', encoding='utf-8') as f:
json.dump(chunks, f, ensure_ascii=False, indent=2)
json.dump(all_chunks, f, ensure_ascii=False, indent=2)
with open(output_dir / "embeddings.pkl", 'wb') as f:
pickle.dump({
'chunks': chunks,
'embeddings': embeddings,
'chunks': all_chunks,
'embeddings': all_embeddings,
'texts': texts,
'normalized': normalize
}, f)
nan_count = np.isnan(embeddings).sum()
nan_count = np.isnan(all_embeddings).sum()
print(f"\n📊 质量检查:")
print(f" NaN values: {nan_count}")
norms = np.linalg.norm(embeddings, axis=1)
norms = np.linalg.norm(all_embeddings, axis=1)
print(f" Norms: min={norms.min():.4f}, max={norms.max():.4f}, mean={norms.mean():.4f}")
print(f"\n✅ 向量数据已保存到: {output_dir}/")
print(f" 文件: embeddings.npy, chunk_id_map.json, chunk_type_map.json, chunks.json, embeddings.pkl")
type_counts = {}
for ct in chunk_types:
for ct in all_chunk_types:
type_counts[ct] = type_counts.get(ct, 0) + 1
print(f"\n📈 Chunk 类型分布:")
for ct, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {ct}: {count}")
return {
"chunks": len(chunks),
"embedding_dim": embeddings.shape[1],
"chunks": len(all_chunks),
"embedding_dim": all_embeddings.shape[1],
"output_dir": str(output_dir)
}
@@ -205,6 +248,8 @@ if __name__ == "__main__":
help="不做向量归一化")
parser.add_argument("--no_fp16", action="store_true",
help="禁用 FP16 半精度(默认启用,可节省约 50%% 显存)")
parser.add_argument("--incremental", "-i", action="store_true",
help="增量模式:只向量化新增 chunks,追加到已有向量数据")
args = parser.parse_args()
@@ -214,5 +259,6 @@ if __name__ == "__main__":
model_path=args.model_path,
batch_size=args.batch_size,
normalize=not args.no_normalize,
use_fp16=not args.no_fp16
use_fp16=not args.no_fp16,
incremental=args.incremental
)
+59 -17
View File
@@ -1,6 +1,7 @@
"""
import_to_chroma.py
已生成的 chunk 向量导入 Chroma 数据库
将 chunk 向量导入 Chroma 数据库
支持 JRXML chunks 和 Markdown chunks 混合导入
"""
import os
@@ -16,7 +17,8 @@ from config import EMBEDDINGS_DIR, CHROMA_DB_PATH, CHROMA_COLLECTION_NAME
def main(embeddings_dir: str = None,
chroma_path: str = None,
collection_name: str = None):
collection_name: str = None,
incremental: bool = False):
"""
从 embeddings 目录读取向量和 chunks,导入 Chroma 持久化数据库
@@ -69,33 +71,55 @@ def main(embeddings_dir: str = None,
chroma_path.mkdir(parents=True, exist_ok=True)
client = chromadb.PersistentClient(path=str(chroma_path))
try:
client.delete_collection(collection_name)
print(f" 已删除旧集合 '{collection_name}'")
except Exception:
pass
collection = client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
if incremental:
try:
collection = client.get_collection(collection_name)
existing_ids = set(collection.get()['ids'])
print(f" 增量模式: 集合 '{collection_name}' 已有 {len(existing_ids)} 条记录")
except Exception:
collection = client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
existing_ids = set()
print(f" 增量模式: 创建新集合 '{collection_name}'")
else:
try:
client.delete_collection(collection_name)
print(f" 已删除旧集合 '{collection_name}'")
except Exception:
pass
collection = client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
existing_ids = set()
print(f"\n🛠️ 准备导入数据...")
ids = []
documents = []
metadatas = []
embeddings_list = []
skipped = 0
seen_ids = {}
for i, chunk in enumerate(tqdm(chunks, desc="准备数据")):
raw_id = str(chunk.get("chunk_id", i))
context = chunk.get("context", "")
if raw_id in seen_ids:
seen_ids[raw_id] += 1
chunk_id = f"{raw_id}_{seen_ids[raw_id]}"
unique_chunk_id = f"{raw_id}_{seen_ids[raw_id]}"
else:
seen_ids[raw_id] = 0
chunk_id = raw_id
ids.append(chunk_id)
unique_chunk_id = raw_id
# 增量模式:跳过已导入的
if incremental and unique_chunk_id in existing_ids:
skipped += 1
continue
ids.append(unique_chunk_id)
doc_text = chunk.get("human_description", "")
documents.append(doc_text)
@@ -105,7 +129,6 @@ def main(embeddings_dir: str = None,
if chunk_type:
meta["chunk_type"] = chunk_type
context = chunk.get("context", "")
if context:
meta["context"] = context
@@ -118,10 +141,26 @@ def main(embeddings_dir: str = None,
meta["element_kind"] = chunk_meta["element_kind"]
if "query_language" in chunk_meta:
meta["query_language"] = chunk_meta["query_language"]
# Markdown-specific metadata
if "heading" in chunk_meta:
meta["heading"] = chunk_meta["heading"]
if "heading_level" in chunk_meta:
meta["heading_level"] = chunk_meta["heading_level"]
if "language" in chunk_meta:
meta["code_language"] = chunk_meta["language"]
metadatas.append(meta)
embeddings_list.append(embeddings[i].tolist())
if incremental and skipped > 0:
print(f" 增量模式: 跳过 {skipped} 条已存在记录")
if not ids:
print(f"\n✅ 没有新数据需要导入,集合已是最新")
print(f" 数据库路径: {chroma_path}")
print(f" 集合数量: {collection.count()}")
return collection
print(f"\n📥 分批导入到 Chroma (每批 1000 条)...")
import_batch_size = 1000
start_time = time.time()
@@ -173,11 +212,14 @@ if __name__ == "__main__":
help=f"Chroma 数据库路径 (默认: {CHROMA_DB_PATH})")
parser.add_argument("--collection_name", "-n", default=CHROMA_COLLECTION_NAME,
help=f"集合名称 (默认: {CHROMA_COLLECTION_NAME})")
parser.add_argument("--incremental", "-i", action="store_true",
help="增量模式:只导入新增记录,不删除已有数据")
args = parser.parse_args()
main(
embeddings_dir=args.embeddings_dir,
chroma_path=args.chroma_path,
collection_name=args.collection_name
collection_name=args.collection_name,
incremental=args.incremental
)
+358
View File
@@ -0,0 +1,358 @@
"""
md_chunker.py
Markdown 语义分块器
支持标题层级、代码块、表格等元素的智能分块
"""
import json
import os
import re
from typing import List, Dict, Tuple
from pathlib import Path
from dataclasses import dataclass, field, asdict
@dataclass
class MDChunk:
"""Single Markdown chunk data structure"""
chunk_id: int
chunk_type: str
human_description: str
raw_content: str
context: str
metadata: Dict = field(default_factory=dict)
class MarkdownSemanticChunker:
"""
Markdown 语义分块器 v1.0
分块策略:
1. 按标题层级(H1/H2/H3...)划分大段落
2. 代码块作为独立 chunk
3. 表格作为独立 chunk
4. 过长段落内部按句子/段落二次拆分
"""
# Heading patterns
HEADING_PATTERN = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE)
# Code block pattern (fenced)
CODE_BLOCK_PATTERN = re.compile(r'```(\w*)\n([\s\S]*?)```', re.MULTILINE)
# Inline code pattern
INLINE_CODE_PATTERN = re.compile(r'`([^`]+)`')
# Table pattern
TABLE_PATTERN = re.compile(r'\|.+\|\n\|[-| :]+\|\n((?:\|.+\|\n)*)', re.MULTILINE)
# List pattern
LIST_PATTERN = re.compile(r'^(\s*[-*+]\s+.+)+', re.MULTILINE)
def __init__(self, max_chunk_size: int = 2000):
self.max_chunk_size = max_chunk_size
def chunk_file(self, file_path: str) -> List[Dict]:
"""处理单个 Markdown 文件"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
file_name = Path(file_path).stem
chunks = []
chunk_id = 0
# 尝试提取文档标题(第一个 H1
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
doc_title = title_match.group(1).strip() if title_match else file_name
# 按结构化元素分割
segments = self._split_by_structure(content)
for segment in segments:
seg_type = segment['type']
seg_content = segment['content']
if not seg_content.strip():
continue
# 构建描述
description = self._build_description(seg_type, seg_content, doc_title)
# 如果超过最大长度,尝试二次拆分
if len(seg_content) > self.max_chunk_size:
sub_chunks = self._split_large_chunk(
seg_content, seg_type, doc_title, chunk_id
)
chunks.extend([asdict(c) for c in sub_chunks])
chunk_id += len(sub_chunks)
else:
chunks.append(asdict(MDChunk(
chunk_id=chunk_id,
chunk_type=seg_type,
human_description=description,
raw_content=seg_content.strip(),
context=f"{doc_title}",
metadata=segment.get('metadata', {})
)))
chunk_id += 1
return chunks
def _split_by_structure(self, content: str) -> List[Dict]:
"""
按 Markdown 结构分割内容
返回: [{'type': 'h1/h2/code/table/paragraph', 'content': '...', 'metadata': {...}}]
"""
segments = []
# 首先提取所有代码块(保留位置标记,稍后处理)
code_blocks = []
code_pattern = re.compile(r'(```\w*\n[\s\S]*?```)', re.MULTILINE)
last_end = 0
for match in code_pattern.finditer(content):
# 处理代码块前的普通文本
before = content[last_end:match.start()]
if before.strip():
segments.extend(self._process_text_section(before))
# 添加代码块
code_blocks.append(match.group(1))
lang_match = re.match(r'```(\w*)', match.group(1))
lang = lang_match.group(1) if lang_match else ''
segments.append({
'type': 'code',
'content': match.group(1),
'metadata': {'language': lang}
})
last_end = match.end()
# 处理剩余文本
remaining = content[last_end:]
if remaining.strip():
segments.extend(self._process_text_section(remaining))
return segments
def _process_text_section(self, text: str) -> List[Dict]:
"""处理普通文本区域,提取标题和段落"""
segments = []
# 按标题分割
lines = text.split('\n')
current_section = []
current_heading_level = 0
current_heading = ''
for line in lines:
heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
if heading_match:
# 保存之前的段落
if current_section:
section_text = '\n'.join(current_section).strip()
if section_text:
segments.append({
'type': self._get_section_type(current_heading_level, current_heading),
'content': section_text,
'metadata': {
'heading': current_heading,
'heading_level': current_heading_level
}
})
current_section = []
# 开始新标题区域
current_heading_level = len(heading_match.group(1))
current_heading = heading_match.group(2).strip()
else:
current_section.append(line)
# 保存最后一段
if current_section:
section_text = '\n'.join(current_section).strip()
if section_text:
segments.append({
'type': self._get_section_type(current_heading_level, current_heading),
'content': section_text,
'metadata': {
'heading': current_heading,
'heading_level': current_heading_level
}
})
return segments
def _get_section_type(self, level: int, heading: str) -> str:
"""根据标题级别和内容确定段落类型"""
heading_lower = heading.lower()
if level == 1:
return 'section_h1'
elif level == 2:
# 检测特殊章节类型
if any(kw in heading_lower for kw in ['install', '安装', 'setup', '部署']):
return 'section_installation'
elif any(kw in heading_lower for kw in ['config', '配置', 'setting']):
return 'section_configuration'
elif any(kw in heading_lower for kw in ['api', '接口']):
return 'section_api'
elif any(kw in heading_lower for kw in ['example', '示例', 'usage', '使用']):
return 'section_example'
elif any(kw in heading_lower for kw in ['faq', 'question', '问题', '常见']):
return 'section_faq'
elif any(kw in heading_lower for kw in ['changelog', '更新', 'release']):
return 'section_changelog'
return 'section_h2'
elif level == 3:
return 'section_h3'
else:
return 'section_other'
def _build_description(self, chunk_type: str, content: str, doc_title: str) -> str:
"""为 chunk 生成人类可读描述"""
lines = content.split('\n')[:5]
preview = ' '.join(line.strip() for line in lines if line.strip())[:150]
if chunk_type == 'code':
lang = ''
lang_match = re.match(r'```(\w*)', content)
if lang_match:
lang = lang_match.group(1) or 'text'
return f"Code block (language: {lang}) in {doc_title}. Preview: {preview}"
elif chunk_type.startswith('section_'):
heading = content.split('\n')[0] if '\n' in content else content[:50]
heading_clean = re.sub(r'^#+\s+', '', heading)
type_hint = chunk_type.replace('section_', '')
return f"[{type_hint.upper()}] {heading_clean}. Content: {preview}"
else:
return f"Document section in {doc_title}. Content: {preview}"
def _split_large_chunk(self, content: str, chunk_type: str,
doc_title: str, start_id: int) -> List[MDChunk]:
"""对过长的 chunk 进行二次拆分"""
chunks = []
# 按段落分割(双换行符)
paragraphs = re.split(r'\n\n+', content)
current_chunk = []
current_size = 0
for para in paragraphs:
para_size = len(para)
if current_size + para_size > self.max_chunk_size and current_chunk:
# 当前块已满,生成 chunk
chunk_text = '\n\n'.join(current_chunk)
chunks.append(MDChunk(
chunk_id=start_id + len(chunks),
chunk_type=f"{chunk_type}_part",
human_description=f"Part of {doc_title} ({chunk_type}): {chunk_text[:100]}...",
raw_content=chunk_text,
context=f"{doc_title} (continued)",
metadata={'part': len(chunks) + 1, 'original_type': chunk_type}
))
current_chunk = []
current_size = 0
current_chunk.append(para)
current_size += para_size + 2
# 处理剩余内容
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append(MDChunk(
chunk_id=start_id + len(chunks),
chunk_type=f"{chunk_type}_part",
human_description=f"Part of {doc_title} ({chunk_type}): {chunk_text[:100]}...",
raw_content=chunk_text,
context=f"{doc_title} (continued)",
metadata={'part': len(chunks) + 1, 'original_type': chunk_type}
))
return chunks if chunks else [MDChunk(
chunk_id=start_id,
chunk_type=chunk_type,
human_description=f"{doc_title}: {content[:100]}...",
raw_content=content[:self.max_chunk_size],
context=doc_title,
metadata={'truncated': True}
)]
def chunk_directory(self, dir_path: str, extensions: tuple = ('.md', '.markdown')) -> List[Dict]:
"""批量处理目录下所有 Markdown 文件"""
all_chunks = []
file_count = 0
for root, _, files in os.walk(dir_path):
for file in files:
if file.lower().endswith(extensions):
file_path = os.path.join(root, file)
try:
chunks = self.chunk_file(file_path)
all_chunks.extend(chunks)
file_count += 1
print(f"OK {file_path}: {len(chunks)} chunks")
except Exception as e:
print(f"FAIL {file_path}: {e}")
print(f"\nTotal: {file_count} files, {len(all_chunks)} chunks")
return all_chunks
def save_chunks_to_json(chunks: List[Dict], output_path: str):
"""保存 chunks 到 JSON 文件"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(chunks, f, ensure_ascii=False, indent=2)
print(f"Saved {len(chunks)} chunks to {output_path}")
def print_chunk_summary(chunks: List[Dict]):
"""打印 chunk 类型统计"""
type_counts = {}
for chunk in chunks:
chunk_type = chunk["chunk_type"]
type_counts[chunk_type] = type_counts.get(chunk_type, 0) + 1
print("\nChunk Type Summary:")
for chunk_type, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {chunk_type}: {count}")
if __name__ == "__main__":
import sys
chunker = MarkdownSemanticChunker(max_chunk_size=2000)
if len(sys.argv) > 1:
path = sys.argv[1]
if os.path.isdir(path):
all_chunks = chunker.chunk_directory(path)
output_path = os.path.join(os.path.dirname(path.rstrip("/\\")) if os.path.dirname(path) else ".",
os.path.basename(path.rstrip("/\\")) + "_md_chunks.json")
save_chunks_to_json(all_chunks, output_path)
print_chunk_summary(all_chunks)
else:
chunks = chunker.chunk_file(path)
output_path = path.replace(".md", "_chunks.json").replace(".markdown", "_chunks.json")
save_chunks_to_json(chunks, output_path)
print(f"\n{'='*60}")
print("Chunking Results Preview")
print(f"{'='*60}")
for chunk in chunks[:10]:
print(f"\n[Chunk {chunk['chunk_id']}] Type: {chunk['chunk_type']}")
print(f"Description: {chunk['human_description'][:120]}...")
print(f"Content length: {len(chunk['raw_content'])} chars")
if len(chunks) > 10:
print(f"\n... and {len(chunks) - 10} more chunks")
print_chunk_summary(chunks)
else:
print("=" * 60)
print("Markdown Semantic Chunking v1.0")
print("=" * 60)
print("\nUsage: python md_chunker.py <md_file_or_directory>")
+10
View File
@@ -0,0 +1,10 @@
# Core dependencies
torch>=2.0.0
sentence-transformers>=2.2.0
chromadb>=0.4.0
numpy>=1.24.0
tqdm>=4.65.0
huggingface_hub>=0.19.0
# Optional - for LangChain document conversion
langchain>=0.1.0