bd5bfbac2d
Root cause: LLM receiving full 34k-char JRXML would regenerate from scratch
instead of modifying coordinates in-place, shrinking output to ~3k chars.
Solution (programmatic node control, not prompt engineering):
- New agent/jrxml_windower.py: decompose JRXML into header (never sent to
LLM) + individual bands. Split bands >4000 chars at element boundaries.
Reassemble with element count validation (>10% change = rollback).
- Rewrite refine_layout: per-band windowed LLM processing (~2-4k chars
each). LLM cannot "reimagine" the entire report.
- Rewrite map_fields: 100% programmatic regex $F{field_N} -> real name
replacement. Zero LLM calls, zero content loss.
- _sanitize_field_name: non-ASCII chars escaped to _uXXXX_ format for
valid JRXML identifiers.
- Tests: 48 new unit tests (windower 28 + map_fields 20). All passing.
Full suite 385 tests, zero regressions.
266 lines
9.6 KiB
Python
266 lines
9.6 KiB
Python
"""kb_manager.py 测试 — 用户 + KB CRUD, 原子写入, ID 验证。"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from backend.kb_manager import (
|
|
_validate_id, _now_iso, _ensure_dir, _read_json, _write_json_atomic,
|
|
_load_users, _save_users,
|
|
create_user, list_users, get_user, delete_user,
|
|
create_kb, list_kbs, get_kb, update_kb_meta, delete_kb,
|
|
get_kb_raw_dir, get_kb_chunks_path, get_kb_chroma_path,
|
|
KB_DATA_DIR, _USERS_FILE,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_kb_data(monkeypatch):
|
|
with tempfile.TemporaryDirectory(prefix="test_kb_") as tmpdir:
|
|
monkeypatch.setattr("backend.kb_manager.KB_DATA_DIR", Path(tmpdir))
|
|
monkeypatch.setattr("backend.kb_manager._USERS_FILE", Path(tmpdir) / "users.json")
|
|
yield Path(tmpdir)
|
|
|
|
|
|
@pytest.fixture
|
|
def user(temp_kb_data):
|
|
return create_user("测试用户")
|
|
|
|
|
|
@pytest.fixture
|
|
def kb(temp_kb_data, user):
|
|
return create_kb(user["user_id"], "测试知识库", "测试描述")
|
|
|
|
|
|
# ── ID 验证 ─────────────────────────────────────────────────────
|
|
|
|
class TestIDValidation:
|
|
def test_valid_hex_id_passes(self):
|
|
_validate_id("aabbccddeeff0011223344", "test_id")
|
|
|
|
def test_short_id_raises(self):
|
|
with pytest.raises(ValueError, match="Invalid"):
|
|
_validate_id("abc", "test_id")
|
|
|
|
def test_non_hex_id_raises(self):
|
|
with pytest.raises(ValueError, match="Invalid"):
|
|
_validate_id("not_valid!!!", "test_id")
|
|
|
|
def test_empty_id_raises(self):
|
|
with pytest.raises(ValueError, match="Invalid"):
|
|
_validate_id("", "test_id")
|
|
|
|
|
|
# ── 原子写入 ────────────────────────────────────────────────────
|
|
|
|
class TestAtomicWrite:
|
|
def test_write_json_atomic_creates_file(self, temp_kb_data):
|
|
fp = temp_kb_data / "test.json"
|
|
_write_json_atomic(fp, {"key": "value"})
|
|
assert fp.exists()
|
|
assert json.loads(fp.read_text(encoding="utf-8")) == {"key": "value"}
|
|
|
|
def test_write_json_atomic_overwrites(self, temp_kb_data):
|
|
fp = temp_kb_data / "test.json"
|
|
_write_json_atomic(fp, {"a": 1})
|
|
_write_json_atomic(fp, {"b": 2})
|
|
assert json.loads(fp.read_text(encoding="utf-8")) == {"b": 2}
|
|
|
|
def test_write_json_atomic_creates_parent_dir(self, temp_kb_data):
|
|
fp = temp_kb_data / "deep" / "nested" / "test.json"
|
|
_write_json_atomic(fp, {"ok": True})
|
|
assert fp.exists()
|
|
|
|
def test_write_json_atomic_no_partial_file_on_error(self, temp_kb_data):
|
|
fp = temp_kb_data / "fail.json"
|
|
with patch("json.dump", side_effect=RuntimeError("boom")):
|
|
with pytest.raises(RuntimeError):
|
|
_write_json_atomic(fp, {"x": 1})
|
|
assert not fp.exists()
|
|
tmps = list(temp_kb_data.glob("*.json*"))
|
|
assert len(tmps) == 0 or all(not f.name.endswith(".tmp") for f in tmps)
|
|
|
|
|
|
# ── 用户 CRUD ───────────────────────────────────────────────────
|
|
|
|
class TestUserCRUD:
|
|
def test_create_user_returns_dict(self, temp_kb_data):
|
|
u = create_user("张三")
|
|
assert u["name"] == "张三"
|
|
assert len(u["user_id"]) >= 12
|
|
assert "created_at" in u
|
|
|
|
def test_create_user_persists_to_disk(self, temp_kb_data):
|
|
u = create_user("李四")
|
|
loaded = list_users()
|
|
assert any(x["user_id"] == u["user_id"] for x in loaded)
|
|
|
|
def test_create_user_with_custom_id(self, temp_kb_data):
|
|
uid = "abcdef1234567890abcdef"
|
|
u = create_user("王五", user_id=uid)
|
|
assert u["user_id"] == uid
|
|
|
|
def test_create_duplicate_user_id_raises(self, temp_kb_data):
|
|
uid = "deadbeef1234567890abcd"
|
|
create_user("用户1", user_id=uid)
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
create_user("用户2", user_id=uid)
|
|
|
|
def test_list_users_empty(self, temp_kb_data):
|
|
assert list_users() == []
|
|
|
|
def test_list_users_returns_all(self, temp_kb_data):
|
|
create_user("A")
|
|
create_user("B")
|
|
assert len(list_users()) == 2
|
|
|
|
def test_get_user_found(self, user):
|
|
u = get_user(user["user_id"])
|
|
assert u is not None
|
|
assert u["name"] == user["name"]
|
|
|
|
def test_get_user_not_found(self, temp_kb_data):
|
|
assert get_user("deadbeef1234567890abcd") is None
|
|
|
|
def test_get_user_invalid_id_raises(self, temp_kb_data):
|
|
with pytest.raises(ValueError):
|
|
get_user("invalid")
|
|
|
|
def test_delete_user_returns_true(self, user):
|
|
assert delete_user(user["user_id"]) is True
|
|
|
|
def test_delete_user_removes_from_list(self, user):
|
|
delete_user(user["user_id"])
|
|
assert get_user(user["user_id"]) is None
|
|
|
|
def test_delete_user_removes_dir(self, temp_kb_data, user):
|
|
user_dir = temp_kb_data / user["user_id"]
|
|
assert user_dir.exists()
|
|
delete_user(user["user_id"])
|
|
assert not user_dir.exists()
|
|
|
|
def test_delete_user_not_found_returns_false(self, temp_kb_data):
|
|
assert delete_user("deadbeef1234567890abcd") is False
|
|
|
|
def test_delete_user_invalid_id_raises(self, temp_kb_data):
|
|
with pytest.raises(ValueError):
|
|
delete_user("bad_id")
|
|
|
|
|
|
# ── KB CRUD ─────────────────────────────────────────────────────
|
|
|
|
class TestKbCRUD:
|
|
def test_create_kb_returns_meta(self, kb):
|
|
assert kb["name"] == "测试知识库"
|
|
assert len(kb["kb_id"]) >= 12
|
|
assert kb["parse_status"] == "empty"
|
|
assert kb["file_count"] == 0
|
|
|
|
def test_create_kb_creates_dir_structure(self, temp_kb_data, user, kb):
|
|
kb_dir = temp_kb_data / user["user_id"] / kb["kb_id"]
|
|
assert kb_dir.is_dir()
|
|
assert (kb_dir / "raw").is_dir()
|
|
assert (kb_dir / "meta.json").exists()
|
|
|
|
def test_create_kb_with_custom_id(self, user):
|
|
kid = "cafebabe1234567890feed"
|
|
kb = create_kb(user["user_id"], "自定义ID库", kb_id=kid)
|
|
assert kb["kb_id"] == kid
|
|
|
|
def test_list_kbs_empty(self, user):
|
|
assert list_kbs(user["user_id"]) == []
|
|
|
|
def test_list_kbs_returns_all(self, user):
|
|
create_kb(user["user_id"], "B库")
|
|
create_kb(user["user_id"], "A库")
|
|
assert len(list_kbs(user["user_id"])) == 2
|
|
|
|
def test_list_kbs_summary_format(self, user, kb):
|
|
kbs = list_kbs(user["user_id"])
|
|
s = kbs[0]
|
|
for key in ("kb_id", "name", "field_count", "template_count", "parse_status"):
|
|
assert key in s
|
|
|
|
def test_get_kb_found(self, kb):
|
|
k = get_kb(kb["kb_id"])
|
|
assert k is not None
|
|
assert k["name"] == kb["name"]
|
|
|
|
def test_get_kb_not_found(self, temp_kb_data):
|
|
assert get_kb("deadbeef1234567890abcd") is None
|
|
|
|
def test_get_kb_invalid_id_raises(self, temp_kb_data):
|
|
with pytest.raises(ValueError):
|
|
get_kb("bad")
|
|
|
|
def test_update_kb_meta_changes_fields(self, kb):
|
|
updated = update_kb_meta(kb["kb_id"], {"parse_status": "ready", "file_count": 5})
|
|
assert updated is not None
|
|
assert updated["parse_status"] == "ready"
|
|
assert updated["file_count"] == 5
|
|
assert "updated_at" in updated
|
|
|
|
def test_update_kb_meta_not_found(self, temp_kb_data):
|
|
assert update_kb_meta("deadbeef1234567890abcd", {"x": 1}) is None
|
|
|
|
def test_delete_kb_returns_true(self, kb):
|
|
assert delete_kb(kb["kb_id"]) is True
|
|
|
|
def test_delete_kb_removes_dir(self, temp_kb_data, user, kb):
|
|
kb_dir = temp_kb_data / user["user_id"] / kb["kb_id"]
|
|
assert kb_dir.exists()
|
|
delete_kb(kb["kb_id"])
|
|
assert not kb_dir.exists()
|
|
|
|
def test_delete_kb_not_found_returns_false(self, temp_kb_data):
|
|
assert delete_kb("deadbeef1234567890abcd") is False
|
|
|
|
|
|
# ── 工具函数 ────────────────────────────────────────────────────
|
|
|
|
class TestHelpers:
|
|
def test_get_kb_raw_dir(self, kb):
|
|
d = get_kb_raw_dir(kb["kb_id"])
|
|
assert d is not None
|
|
assert d.name == "raw"
|
|
|
|
def test_get_kb_raw_dir_not_found(self, temp_kb_data):
|
|
assert get_kb_raw_dir("deadbeef1234567890abcd") is None
|
|
|
|
def test_get_kb_chunks_path(self, kb):
|
|
p = get_kb_chunks_path(kb["kb_id"])
|
|
assert p is not None
|
|
assert p.name == "chunks.json"
|
|
|
|
def test_get_kb_chroma_path_creates_dir(self, kb):
|
|
p = get_kb_chroma_path(kb["kb_id"])
|
|
assert p is not None
|
|
assert p.name == "chroma"
|
|
assert p.exists()
|
|
|
|
def test_user_can_own_multiple_kbs(self, user):
|
|
create_kb(user["user_id"], "KB1")
|
|
create_kb(user["user_id"], "KB2")
|
|
create_kb(user["user_id"], "KB3")
|
|
assert len(list_kbs(user["user_id"])) == 3
|
|
|
|
def test_different_users_have_isolated_kbs(self, temp_kb_data):
|
|
u1 = create_user("用户A")
|
|
u2 = create_user("用户B")
|
|
create_kb(u1["user_id"], "A的库")
|
|
create_kb(u2["user_id"], "B的库")
|
|
assert len(list_kbs(u1["user_id"])) == 1
|
|
assert len(list_kbs(u2["user_id"])) == 1
|
|
|
|
def test_delete_user_cascades_to_kbs(self, temp_kb_data, user):
|
|
create_kb(user["user_id"], "要被删除的库")
|
|
delete_user(user["user_id"])
|
|
assert not (temp_kb_data / user["user_id"]).exists()
|