Files
panda bd5bfbac2d fix: band-level windowed refine_layout + programmatic map_fields to prevent 91.5% content loss
Root cause: LLM receiving full 34k-char JRXML would regenerate from scratch
instead of modifying coordinates in-place, shrinking output to ~3k chars.

Solution (programmatic node control, not prompt engineering):

- New agent/jrxml_windower.py: decompose JRXML into header (never sent to
  LLM) + individual bands. Split bands >4000 chars at element boundaries.
  Reassemble with element count validation (>10% change = rollback).

- Rewrite refine_layout: per-band windowed LLM processing (~2-4k chars
  each). LLM cannot "reimagine" the entire report.

- Rewrite map_fields: 100% programmatic regex $F{field_N} -> real name
  replacement. Zero LLM calls, zero content loss.

- _sanitize_field_name: non-ASCII chars escaped to _uXXXX_ format for
  valid JRXML identifiers.

- Tests: 48 new unit tests (windower 28 + map_fields 20). All passing.
  Full suite 385 tests, zero regressions.
2026-05-24 08:55:38 +08:00

228 lines
6.7 KiB
Python

"""多租户知识库管理模块。
用户 + 知识库 CRUD,持久化到 kb_data/ 目录。
每个 KB 拥有独立的 JSON 元数据文件和文件存储目录。
"""
import json
import os
import re
import uuid
import tempfile
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from backend.logger import get_logger
load_dotenv()
_kb_log = get_logger("kb_manager")
KB_DATA_DIR = Path(os.getenv("KB_DATA_DIR", "./kb_data"))
_USERS_FILE = KB_DATA_DIR / "users.json"
_VALID_ID_RE = re.compile(r'^[a-fA-F0-9]{12,}$')
def _validate_id(id_str: str, label: str = "id") -> None:
if not _VALID_ID_RE.match(id_str):
raise ValueError(f"Invalid {label}: {id_str!r}")
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def _read_json(fp: Path) -> dict:
with open(fp, "r", encoding="utf-8") as f:
return json.load(f)
def _write_json_atomic(fp: Path, data: dict) -> None:
_ensure_dir(fp.parent)
tmp = tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False,
dir=fp.parent, encoding="utf-8",
)
try:
json.dump(data, tmp, ensure_ascii=False, indent=2)
tmp.flush()
os.fsync(tmp.fileno())
tmp.close()
os.replace(tmp.name, str(fp))
except Exception:
tmp.close()
Path(tmp.name).unlink(missing_ok=True)
raise
# ── User CRUD ──────────────────────────────────────────────────────────────
def _load_users() -> list[dict]:
_ensure_dir(KB_DATA_DIR)
if _USERS_FILE.exists():
return _read_json(_USERS_FILE)
return []
def _save_users(users: list[dict]) -> None:
_write_json_atomic(_USERS_FILE, users)
def create_user(name: str, user_id: Optional[str] = None) -> dict:
uid = user_id or uuid.uuid4().hex
users = _load_users()
if any(u["user_id"] == uid for u in users):
raise ValueError(f"User {uid} already exists")
user = {"user_id": uid, "name": name, "created_at": _now_iso()}
users.append(user)
_save_users(users)
_ensure_dir(KB_DATA_DIR / uid)
_write_json_atomic(KB_DATA_DIR / uid / "profile.json", user)
_kb_log.info("创建用户", extra={"user_id": uid, "user_name": name})
return user
def list_users() -> list[dict]:
return _load_users()
def get_user(user_id: str) -> Optional[dict]:
_validate_id(user_id, "user_id")
for u in _load_users():
if u["user_id"] == user_id:
return u
return None
def delete_user(user_id: str) -> bool:
_validate_id(user_id, "user_id")
users = _load_users()
filtered = [u for u in users if u["user_id"] != user_id]
if len(filtered) == len(users):
return False
_save_users(filtered)
user_dir = KB_DATA_DIR / user_id
if user_dir.exists():
shutil.rmtree(user_dir)
_kb_log.info("删除用户", extra={"user_id": user_id})
return True
# ── KB CRUD ────────────────────────────────────────────────────────────────
def _kb_dir(kb_id: str) -> Optional[Path]:
_validate_id(kb_id, "kb_id")
for user_dir in KB_DATA_DIR.iterdir():
if user_dir.is_dir() and not user_dir.name.startswith("."):
candidate = user_dir / kb_id
if candidate.is_dir():
return candidate
return None
def _ensure_user_dir(user_id: str) -> Path:
_validate_id(user_id, "user_id")
d = KB_DATA_DIR / user_id
_ensure_dir(d)
return d
def create_kb(user_id: str, name: str, description: str = "",
kb_id: Optional[str] = None) -> dict:
user_dir = _ensure_user_dir(user_id)
kid = kb_id or uuid.uuid4().hex
kb_dir = user_dir / kid
_ensure_dir(kb_dir)
_ensure_dir(kb_dir / "raw")
now = _now_iso()
meta = {
"kb_id": kid, "user_id": user_id, "name": name,
"description": description, "created_at": now, "updated_at": now,
"fields": [], "templates": [], "file_count": 0,
"chunk_count": 0, "parse_status": "empty",
}
_write_json_atomic(kb_dir / "meta.json", meta)
_kb_log.info("创建知识库", extra={"kb_id": kid, "user_id": user_id, "kb_name": name})
return meta
def list_kbs(user_id: str) -> list[dict]:
user_dir = _ensure_user_dir(user_id)
kbs = []
for kb_dir in sorted(user_dir.iterdir(), key=os.path.getmtime, reverse=True):
if kb_dir.is_dir() and not kb_dir.name.startswith("."):
meta_path = kb_dir / "meta.json"
if meta_path.exists():
meta = _read_json(meta_path)
kbs.append({
"kb_id": meta.get("kb_id", kb_dir.name),
"name": meta.get("name", kb_dir.name),
"description": meta.get("description", ""),
"created_at": meta.get("created_at", ""),
"updated_at": meta.get("updated_at", ""),
"field_count": len(meta.get("fields", [])),
"template_count": len(meta.get("templates", [])),
"file_count": meta.get("file_count", 0),
"chunk_count": meta.get("chunk_count", 0),
"parse_status": meta.get("parse_status", "empty"),
})
return kbs
def get_kb(kb_id: str) -> Optional[dict]:
_validate_id(kb_id, "kb_id")
kb_dir = _kb_dir(kb_id)
if kb_dir is None:
return None
meta_path = kb_dir / "meta.json"
return _read_json(meta_path) if meta_path.exists() else None
def update_kb_meta(kb_id: str, updates: dict) -> Optional[dict]:
kb_dir = _kb_dir(kb_id)
if kb_dir is None:
return None
meta_path = kb_dir / "meta.json"
meta = _read_json(meta_path)
meta.update(updates)
meta["updated_at"] = _now_iso()
_write_json_atomic(meta_path, meta)
return meta
def delete_kb(kb_id: str) -> bool:
kb_dir = _kb_dir(kb_id)
if kb_dir is None:
return False
shutil.rmtree(kb_dir)
_kb_log.info("删除知识库", extra={"kb_id": kb_id})
return True
def get_kb_raw_dir(kb_id: str) -> Optional[Path]:
kb_dir = _kb_dir(kb_id)
return kb_dir / "raw" if kb_dir else None
def get_kb_chunks_path(kb_id: str) -> Optional[Path]:
kb_dir = _kb_dir(kb_id)
return kb_dir / "chunks.json" if kb_dir else None
def get_kb_chroma_path(kb_id: str) -> Optional[Path]:
kb_dir = _kb_dir(kb_id)
if kb_dir is None:
return None
chroma_dir = kb_dir / "chroma"
_ensure_dir(chroma_dir)
return chroma_dir