"""backend/error_kb.py 单元测试 — 指纹去重 + 关键词提取 + CRUD。 覆盖: - _make_fingerprint 标准化与去重 - _extract_keywords 中英文混合提取 - ErrorKB.record / exists / search / search_as_context(mock ChromaDB) - 全局便捷函数 record_error / search_error_cases """ import os import sys import json from pathlib import Path from unittest.mock import MagicMock, patch import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) from backend.error_kb import ( _make_fingerprint, _extract_keywords, ErrorKB, get_error_kb, record_error, search_error_cases, ) # ── _make_fingerprint ─────────────────────────────────────────── class TestMakeFingerprint: def test_same_structure_same_fingerprint(self): e1 = "Field $F{customer_name} is not declared in the report" e2 = "Field $F{order_total} is not declared in the report" assert _make_fingerprint(e1) == _make_fingerprint(e2) def test_different_errors_different_fingerprint(self): e1 = "Missing required attribute pageWidth" e2 = "Query returned 0 results" assert _make_fingerprint(e1) != _make_fingerprint(e2) def test_normalizes_variable_names(self): fp1 = _make_fingerprint("Field $F{amount} not found") fp2 = _make_fingerprint("Field $F{total_price} not found") assert fp1 == fp2 def test_normalizes_string_literals_single_quote(self): fp1 = _make_fingerprint("Value 'abc123' is invalid") fp2 = _make_fingerprint("Value 'xyz789' is invalid") assert fp1 == fp2 def test_normalizes_string_literals_double_quote(self): fp1 = _make_fingerprint('Name "test_table" not found') fp2 = _make_fingerprint('Name "prod_table" not found') assert fp1 == fp2 def test_normalizes_numbers(self): fp1 = _make_fingerprint("Line 42 has 100 errors") fp2 = _make_fingerprint("Line 7 has 3 errors") assert fp1 == fp2 def test_case_insensitive(self): assert _make_fingerprint("ERROR: Missing Field") == _make_fingerprint("error: missing field") def test_whitespace_insensitive(self): e1 = "missing field\n\ndeclaration" e2 = "missing field declaration" assert _make_fingerprint(e1) == _make_fingerprint(e2) def test_output_is_16_char_hex(self): fp = _make_fingerprint("some error message") assert len(fp) == 16 assert all(c in "0123456789abcdef" for c in fp) # ── _extract_keywords ─────────────────────────────────────────── class TestExtractKeywords: def test_extracts_chinese_words(self): kw = _extract_keywords("未声明的字段引用和语法错误") has_cn = any(len(k) >= 2 and "一" <= k[0] <= "鿿" for k in kw) assert has_cn def test_extracts_english_tokens(self): kw = _extract_keywords("missing field declaration in report") assert "missing" in kw assert "field" in kw assert "report" in kw def test_extracts_jrxml_patterns(self): kw = _extract_keywords("Field $F{customer_name} not declared") assert "$F{customer_name}" in kw def test_short_tokens_ignored(self): kw = _extract_keywords("a b c ab cd") assert "ab" not in kw assert "cd" not in kw def test_empty_input_returns_empty_list(self): assert _extract_keywords("") == [] def test_mixed_cn_en_jrxml(self): kw = _extract_keywords("字段 $F{amount} 在 report 中未声明") assert "$F{amount}" in kw assert "report" in kw # ── ErrorKB class (mock ChromaDB) ─────────────────────────────── def _make_patched_kb(client_override=None, collection_override=None): """创建一个 ErrorKB 实例,其 ChromaDB 依赖已被 mock。 因为 chromadb 是懒加载的(在 client/collection property 中导入), 直接设置 _client/_collection 实例属性即可绕过真实 ChromaDB。 """ kb = ErrorKB() kb._client = client_override or MagicMock() kb._collection = collection_override or MagicMock() if not client_override and not collection_override: # 默认:client.get_collection 返回 mock collection kb._client.get_collection.return_value = kb._collection return kb class TestErrorKBRecord: def test_exists_returns_true_when_found(self): col = MagicMock() col.get.return_value = {"ids": ["abc123"]} kb = _make_patched_kb(collection_override=col) assert kb.exists("some error") is True def test_exists_returns_false_when_not_found(self): col = MagicMock() col.get.return_value = {"ids": []} kb = _make_patched_kb(collection_override=col) assert kb.exists("some error") is False def test_exists_survives_exception(self): col = MagicMock() col.get.side_effect = RuntimeError("db down") kb = _make_patched_kb(collection_override=col) assert kb.exists("some error") is False def test_record_skips_duplicate(self): col = MagicMock() col.get.return_value = {"ids": ["existing_fp"]} kb = _make_patched_kb(collection_override=col) assert kb.record("error", "", "", "fix prompt") is False col.add.assert_not_called() def test_record_adds_new_case(self): col = MagicMock() col.get.return_value = {"ids": []} kb = _make_patched_kb(collection_override=col) assert kb.record( "Field $F{x} not declared", "", "", "prompt content", model="test-model", retry_count=2, ) is True col.add.assert_called_once() meta = col.add.call_args[1]["metadatas"][0] assert meta["retry_success"] == 3 class TestErrorKBSearch: @pytest.fixture def col(self): return MagicMock() @pytest.fixture def kb(self, col): return _make_patched_kb(collection_override=col) def test_search_returns_formatted_results(self, kb, col): col.get.return_value = {"ids": []} col.query.return_value = { "ids": [["fp1"]], "documents": [[json.dumps({ "error": "test error", "good_jrxml_snippet": "", "correction_prompt": "fix it", "recorded_at": "2026-01-01T00:00:00", })]], "metadatas": [[{}]], "distances": [[0.05]], } results = kb.search("some error", k=3) assert len(results) == 1 assert results[0]["error"] == "test error" assert results[0]["distance"] == 0.05 def test_search_returns_empty_on_exception(self, kb, col): col.query.side_effect = RuntimeError("fail") assert kb.search("error") == [] def test_search_as_context_formats_output(self, kb, col): col.get.return_value = {"ids": []} col.query.return_value = { "ids": [["fp1", "fp2"]], "documents": [[ json.dumps({"error": "e1", "good_jrxml_snippet": "", "correction_prompt": "p1", "recorded_at": ""}), json.dumps({"error": "e2", "good_jrxml_snippet": "", "correction_prompt": "p2", "recorded_at": ""}), ]], "metadatas": [[{}, {}]], "distances": [[0.1, 0.2]], } ctx = kb.search_as_context("error", k=2) assert "[历史错误案例]" in ctx assert "---" in ctx def test_search_as_context_empty_for_no_results(self, kb, col): col.get.return_value = {"ids": []} col.query.return_value = {"ids": [[]], "documents": [[]], "distances": [[]]} assert kb.search_as_context("error") == "" def test_stats_returns_count(self, kb, col): col.count.return_value = 42 assert kb.stats()["total_cases"] == 42 def test_stats_zero_on_exception(self, kb, col): col.count.side_effect = RuntimeError("down") assert kb.stats()["total_cases"] == 0 # ── 全局便捷函数 ─────────────────────────────────────────────── class TestConvenienceFunctions: def test_get_error_kb_is_singleton(self, monkeypatch): import backend.error_kb as mod monkeypatch.setattr(mod, "_kb", None) assert get_error_kb() is get_error_kb() def test_record_error_delegates(self): with patch.object(ErrorKB, "record", return_value=True) as mock_r: assert record_error("e", "", "", "p") is True mock_r.assert_called_once() def test_search_error_cases_delegates(self): with patch.object(ErrorKB, "search_as_context", return_value="ctx") as mock_s: assert search_error_cases("err", k=5) == "ctx" mock_s.assert_called_once_with("err", k=5)