695 lines
22 KiB
Python
695 lines
22 KiB
Python
"""
|
||
统一的JSON解析和修复工具。
|
||
|
||
提供鲁棒的JSON解析能力,支持:
|
||
1. 自动清理markdown代码块标记和思考内容
|
||
2. 本地语法修复(括号平衡、逗号补全、控制字符转义等)
|
||
3. 使用json_repair库进行高级修复
|
||
4. LLM辅助修复(可选)
|
||
5. 详细的错误日志和调试信息
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
from typing import Any, Dict, List, Optional, Tuple, Callable
|
||
from loguru import logger
|
||
|
||
try:
|
||
from json_repair import repair_json as _json_repair_fn
|
||
except ImportError:
|
||
_json_repair_fn = None
|
||
|
||
|
||
class JSONParseError(ValueError):
|
||
"""JSON解析失败时抛出的异常,附带原始文本方便排查。"""
|
||
|
||
def __init__(self, message: str, raw_text: Optional[str] = None):
|
||
"""
|
||
构造异常并附加原始输出,便于日志中定位。
|
||
|
||
Args:
|
||
message: 人类可读的错误描述。
|
||
raw_text: 触发异常的完整LLM输出。
|
||
"""
|
||
super().__init__(message)
|
||
self.raw_text = raw_text
|
||
|
||
|
||
class RobustJSONParser:
|
||
"""
|
||
鲁棒的JSON解析器。
|
||
|
||
集成多种修复策略,确保LLM返回的内容能够被正确解析:
|
||
- 清理markdown包裹、思考内容等额外信息
|
||
- 修复常见语法错误(缺少逗号、括号不平衡等)
|
||
- 转义未转义的控制字符
|
||
- 使用第三方库进行高级修复
|
||
- 可选的LLM辅助修复
|
||
"""
|
||
|
||
# 常见的LLM思考内容模式
|
||
_THINKING_PATTERNS = [
|
||
r"<thinking>.*?</thinking>",
|
||
r"<thought>.*?</thought>",
|
||
r"让我想想.*?(?=\{|\[|$)",
|
||
r"首先.*?(?=\{|\[|$)",
|
||
r"分析.*?(?=\{|\[|$)",
|
||
r"根据.*?(?=\{|\[|$)",
|
||
]
|
||
|
||
# 冒号等号模式(LLM常见错误)
|
||
_COLON_EQUALS_PATTERN = re.compile(r'(":\s*)=')
|
||
|
||
def __init__(
|
||
self,
|
||
llm_repair_fn: Optional[Callable[[str, str], Optional[str]]] = None,
|
||
enable_json_repair: bool = True,
|
||
enable_llm_repair: bool = False,
|
||
max_repair_attempts: int = 3,
|
||
):
|
||
"""
|
||
初始化JSON解析器。
|
||
|
||
Args:
|
||
llm_repair_fn: 可选的LLM修复函数,接收(原始JSON, 错误信息)返回修复后的JSON
|
||
enable_json_repair: 是否启用json_repair库
|
||
enable_llm_repair: 是否启用LLM辅助修复
|
||
max_repair_attempts: 最大修复尝试次数
|
||
"""
|
||
self.llm_repair_fn = llm_repair_fn
|
||
self.enable_json_repair = enable_json_repair and _json_repair_fn is not None
|
||
self.enable_llm_repair = enable_llm_repair
|
||
self.max_repair_attempts = max_repair_attempts
|
||
|
||
def parse(
|
||
self,
|
||
raw_text: str,
|
||
context_name: str = "JSON",
|
||
expected_keys: Optional[List[str]] = None,
|
||
extract_wrapper_key: Optional[str] = None,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
解析LLM返回的JSON文本。
|
||
|
||
参数:
|
||
raw_text: LLM原始输出(可能包含```包裹、思考内容等)
|
||
context_name: 上下文名称,用于错误信息
|
||
expected_keys: 期望的键列表,用于验证
|
||
extract_wrapper_key: 如果JSON被包裹在某个键中,指定该键名进行提取
|
||
|
||
返回:
|
||
dict: 解析后的JSON对象
|
||
|
||
异常:
|
||
JSONParseError: 多种修复策略仍无法解析合法JSON
|
||
"""
|
||
if not raw_text or not raw_text.strip():
|
||
raise JSONParseError(f"{context_name}返回空内容")
|
||
|
||
# 步骤1: 清理markdown标记和思考内容
|
||
cleaned = self._clean_response(raw_text)
|
||
|
||
# 步骤2: 收集候选payload
|
||
candidates = [cleaned]
|
||
|
||
# 步骤3: 应用本地修复策略
|
||
local_repaired = self._apply_local_repairs(cleaned)
|
||
if local_repaired != cleaned:
|
||
candidates.append(local_repaired)
|
||
|
||
# 步骤4: 尝试解析所有候选
|
||
last_error: Optional[json.JSONDecodeError] = None
|
||
for i, candidate in enumerate(candidates):
|
||
try:
|
||
data = json.loads(candidate)
|
||
logger.debug(f"{context_name} JSON解析成功(候选{i + 1}/{len(candidates)})")
|
||
return self._extract_and_validate(
|
||
data, expected_keys, extract_wrapper_key, context_name
|
||
)
|
||
except json.JSONDecodeError as exc:
|
||
last_error = exc
|
||
logger.debug(f"{context_name} 候选{i + 1}解析失败: {exc}")
|
||
|
||
# 步骤5: 使用json_repair库
|
||
if self.enable_json_repair:
|
||
repaired = self._attempt_json_repair(cleaned, context_name)
|
||
if repaired:
|
||
try:
|
||
data = json.loads(repaired)
|
||
logger.info(f"{context_name} JSON通过json_repair库修复成功")
|
||
return self._extract_and_validate(
|
||
data, expected_keys, extract_wrapper_key, context_name
|
||
)
|
||
except json.JSONDecodeError as exc:
|
||
last_error = exc
|
||
logger.debug(f"{context_name} json_repair修复后仍无法解析: {exc}")
|
||
|
||
# 步骤6: 使用LLM修复(如果启用)
|
||
if self.enable_llm_repair and self.llm_repair_fn:
|
||
llm_repaired = self._attempt_llm_repair(cleaned, str(last_error), context_name)
|
||
if llm_repaired:
|
||
try:
|
||
data = json.loads(llm_repaired)
|
||
logger.info(f"{context_name} JSON通过LLM修复成功")
|
||
return self._extract_and_validate(
|
||
data, expected_keys, extract_wrapper_key, context_name
|
||
)
|
||
except json.JSONDecodeError as exc:
|
||
last_error = exc
|
||
logger.warning(f"{context_name} LLM修复后仍无法解析: {exc}")
|
||
|
||
# 所有策略都失败了
|
||
error_msg = f"{context_name} JSON解析失败: {last_error}"
|
||
logger.error(error_msg)
|
||
logger.debug(f"原始文本前500字符: {raw_text[:500]}")
|
||
raise JSONParseError(error_msg, raw_text=raw_text) from last_error
|
||
|
||
def _clean_response(self, raw: str) -> str:
|
||
"""
|
||
清理LLM响应,去除markdown标记和思考内容。
|
||
|
||
参数:
|
||
raw: LLM原始输出
|
||
|
||
返回:
|
||
str: 清理后的文本
|
||
"""
|
||
cleaned = raw.strip()
|
||
|
||
# 移除思考内容(多语言支持)
|
||
for pattern in self._THINKING_PATTERNS:
|
||
cleaned = re.sub(pattern, "", cleaned, flags=re.DOTALL | re.IGNORECASE)
|
||
|
||
# 移除markdown代码块标记
|
||
if cleaned.startswith("```json"):
|
||
cleaned = cleaned[7:]
|
||
elif cleaned.startswith("```"):
|
||
cleaned = cleaned[3:]
|
||
|
||
if cleaned.endswith("```"):
|
||
cleaned = cleaned[:-3]
|
||
|
||
cleaned = cleaned.strip()
|
||
|
||
# 尝试提取第一个完整的JSON对象或数组
|
||
cleaned = self._extract_first_json_structure(cleaned)
|
||
|
||
return cleaned
|
||
|
||
def _extract_first_json_structure(self, text: str) -> str:
|
||
"""
|
||
从文本中提取第一个完整的JSON对象或数组。
|
||
|
||
这对于处理LLM在JSON前后添加说明文字的情况很有用。
|
||
|
||
参数:
|
||
text: 可能包含JSON的文本
|
||
|
||
返回:
|
||
str: 提取的JSON文本,如果找不到则返回原文本
|
||
"""
|
||
# 查找第一个 { 或 [
|
||
start_brace = text.find("{")
|
||
start_bracket = text.find("[")
|
||
|
||
if start_brace == -1 and start_bracket == -1:
|
||
return text
|
||
|
||
# 确定起始位置
|
||
if start_brace == -1:
|
||
start = start_bracket
|
||
opener = "["
|
||
closer = "]"
|
||
elif start_bracket == -1:
|
||
start = start_brace
|
||
opener = "{"
|
||
closer = "}"
|
||
else:
|
||
start = min(start_brace, start_bracket)
|
||
opener = text[start]
|
||
closer = "}" if opener == "{" else "]"
|
||
|
||
# 查找对应的结束位置
|
||
depth = 0
|
||
in_string = False
|
||
escaped = False
|
||
|
||
for i in range(start, len(text)):
|
||
ch = text[i]
|
||
|
||
if escaped:
|
||
escaped = False
|
||
continue
|
||
|
||
if ch == "\\":
|
||
escaped = True
|
||
continue
|
||
|
||
if ch == '"':
|
||
in_string = not in_string
|
||
continue
|
||
|
||
if in_string:
|
||
continue
|
||
|
||
if ch in "{[":
|
||
depth += 1
|
||
elif ch in "}]":
|
||
depth -= 1
|
||
if depth == 0:
|
||
return text[start : i + 1]
|
||
|
||
# 如果没找到完整的结构,返回从起始位置到结尾
|
||
return text[start:] if start < len(text) else text
|
||
|
||
def _apply_local_repairs(self, text: str) -> str:
|
||
"""
|
||
应用本地修复策略。
|
||
|
||
参数:
|
||
text: 原始JSON文本
|
||
|
||
返回:
|
||
str: 修复后的文本
|
||
"""
|
||
repaired = text
|
||
mutated = False
|
||
|
||
# 修复 ":=" 错误
|
||
new_text = self._COLON_EQUALS_PATTERN.sub(r"\1", repaired)
|
||
if new_text != repaired:
|
||
logger.warning("检测到\":=\"字符,已自动移除多余的'='号")
|
||
repaired = new_text
|
||
mutated = True
|
||
|
||
# 转义控制字符
|
||
repaired, escaped = self._escape_control_characters(repaired)
|
||
if escaped:
|
||
logger.warning("检测到未转义的控制字符,已自动转换为转义序列")
|
||
mutated = True
|
||
|
||
# 修复缺少的逗号
|
||
repaired, commas_fixed = self._fix_missing_commas(repaired)
|
||
if commas_fixed:
|
||
logger.warning("检测到对象/数组之间缺少逗号,已自动补齐")
|
||
mutated = True
|
||
|
||
# 平衡括号
|
||
repaired, balanced = self._balance_brackets(repaired)
|
||
if balanced:
|
||
logger.warning("检测到括号不平衡,已自动补齐/剔除异常括号")
|
||
mutated = True
|
||
|
||
# 移除尾随逗号
|
||
repaired, trailing_removed = self._remove_trailing_commas(repaired)
|
||
if trailing_removed:
|
||
logger.warning("检测到尾随逗号,已自动移除")
|
||
mutated = True
|
||
|
||
return repaired if mutated else text
|
||
|
||
def _escape_control_characters(self, text: str) -> Tuple[str, bool]:
|
||
"""
|
||
将字符串字面量中的裸换行/制表符/控制字符替换为JSON合法的转义序列。
|
||
|
||
参数:
|
||
text: 原始JSON文本
|
||
|
||
返回:
|
||
Tuple[str, bool]: (修复后的文本, 是否有修改)
|
||
"""
|
||
if not text:
|
||
return text, False
|
||
|
||
result: List[str] = []
|
||
in_string = False
|
||
escaped = False
|
||
mutated = False
|
||
control_map = {"\n": "\\n", "\r": "\\r", "\t": "\\t"}
|
||
|
||
for ch in text:
|
||
if escaped:
|
||
result.append(ch)
|
||
escaped = False
|
||
continue
|
||
|
||
if ch == "\\":
|
||
result.append(ch)
|
||
escaped = True
|
||
continue
|
||
|
||
if ch == '"':
|
||
result.append(ch)
|
||
in_string = not in_string
|
||
continue
|
||
|
||
if in_string and ch in control_map:
|
||
result.append(control_map[ch])
|
||
mutated = True
|
||
continue
|
||
|
||
if in_string and ord(ch) < 0x20:
|
||
result.append(f"\\u{ord(ch):04x}")
|
||
mutated = True
|
||
continue
|
||
|
||
result.append(ch)
|
||
|
||
return "".join(result), mutated
|
||
|
||
def _fix_missing_commas(self, text: str) -> Tuple[str, bool]:
|
||
"""
|
||
在对象/数组元素之间自动补逗号。
|
||
|
||
参数:
|
||
text: 原始JSON文本
|
||
|
||
返回:
|
||
Tuple[str, bool]: (修复后的文本, 是否有修改)
|
||
"""
|
||
if not text:
|
||
return text, False
|
||
|
||
chars: List[str] = []
|
||
mutated = False
|
||
in_string = False
|
||
escaped = False
|
||
length = len(text)
|
||
i = 0
|
||
|
||
while i < length:
|
||
ch = text[i]
|
||
chars.append(ch)
|
||
|
||
if escaped:
|
||
escaped = False
|
||
i += 1
|
||
continue
|
||
|
||
if ch == "\\":
|
||
escaped = True
|
||
i += 1
|
||
continue
|
||
|
||
if ch == '"':
|
||
# 如果我们正在退出字符串,检查后面是否需要逗号
|
||
if in_string:
|
||
# 查找下一个非空白字符
|
||
j = i + 1
|
||
while j < length and text[j] in " \t\r\n":
|
||
j += 1
|
||
# 如果下一个字符是 " { [ 或数字,可能需要逗号
|
||
if j < length:
|
||
next_ch = text[j]
|
||
if next_ch in "\"[{" or next_ch.isdigit():
|
||
# 检查是否已经在对象或数组中
|
||
# 通过检查前面是否有未闭合的 { 或 [
|
||
has_opener = False
|
||
for k in range(len(chars) - 1, -1, -1):
|
||
if chars[k] in "{[":
|
||
has_opener = True
|
||
break
|
||
elif chars[k] in "]}":
|
||
break
|
||
|
||
if has_opener:
|
||
chars.append(",")
|
||
mutated = True
|
||
|
||
in_string = not in_string
|
||
i += 1
|
||
continue
|
||
|
||
# 在 } 或 ] 后面检查是否需要逗号
|
||
if not in_string and ch in "}]":
|
||
j = i + 1
|
||
# 跳过空白
|
||
while j < length and text[j] in " \t\r\n":
|
||
j += 1
|
||
# 如果下一个非空白字符是 { [ " 或数字,添加逗号
|
||
if j < length:
|
||
next_ch = text[j]
|
||
if next_ch in "{[\"" or next_ch.isdigit():
|
||
chars.append(",")
|
||
mutated = True
|
||
|
||
i += 1
|
||
|
||
return "".join(chars), mutated
|
||
|
||
def _balance_brackets(self, text: str) -> Tuple[str, bool]:
|
||
"""
|
||
尝试修复因LLM多写/少写括号导致的不平衡结构。
|
||
|
||
参数:
|
||
text: 原始JSON文本
|
||
|
||
返回:
|
||
Tuple[str, bool]: (修复后的文本, 是否有修改)
|
||
"""
|
||
if not text:
|
||
return text, False
|
||
|
||
result: List[str] = []
|
||
stack: List[str] = []
|
||
mutated = False
|
||
in_string = False
|
||
escaped = False
|
||
|
||
opener_map = {"{": "}", "[": "]"}
|
||
|
||
for ch in text:
|
||
if escaped:
|
||
result.append(ch)
|
||
escaped = False
|
||
continue
|
||
|
||
if ch == "\\":
|
||
result.append(ch)
|
||
escaped = True
|
||
continue
|
||
|
||
if ch == '"':
|
||
result.append(ch)
|
||
in_string = not in_string
|
||
continue
|
||
|
||
if in_string:
|
||
result.append(ch)
|
||
continue
|
||
|
||
if ch in "{[":
|
||
stack.append(ch)
|
||
result.append(ch)
|
||
continue
|
||
|
||
if ch in "}]":
|
||
if stack and (
|
||
(ch == "}" and stack[-1] == "{") or (ch == "]" and stack[-1] == "[")
|
||
):
|
||
stack.pop()
|
||
result.append(ch)
|
||
else:
|
||
# 不匹配的闭括号,忽略
|
||
mutated = True
|
||
continue
|
||
|
||
result.append(ch)
|
||
|
||
# 补齐未闭合的括号
|
||
while stack:
|
||
opener = stack.pop()
|
||
result.append(opener_map[opener])
|
||
mutated = True
|
||
|
||
return "".join(result), mutated
|
||
|
||
def _remove_trailing_commas(self, text: str) -> Tuple[str, bool]:
|
||
"""
|
||
移除JSON对象和数组中的尾随逗号。
|
||
|
||
参数:
|
||
text: 原始JSON文本
|
||
|
||
返回:
|
||
Tuple[str, bool]: (修复后的文本, 是否有修改)
|
||
"""
|
||
if not text:
|
||
return text, False
|
||
|
||
# 使用正则表达式移除尾随逗号
|
||
# 匹配 , 后面跟着空白和 } 或 ] 的情况
|
||
pattern = r",(\s*[}\]])"
|
||
new_text = re.sub(pattern, r"\1", text)
|
||
|
||
return new_text, new_text != text
|
||
|
||
def _attempt_json_repair(self, text: str, context_name: str) -> Optional[str]:
|
||
"""
|
||
使用json_repair库进行高级修复。
|
||
|
||
参数:
|
||
text: 原始JSON文本
|
||
context_name: 上下文名称
|
||
|
||
返回:
|
||
Optional[str]: 修复后的JSON文本,失败返回None
|
||
"""
|
||
if not _json_repair_fn:
|
||
return None
|
||
|
||
try:
|
||
fixed = _json_repair_fn(text)
|
||
if fixed and fixed != text:
|
||
logger.info(f"{context_name} 使用json_repair库自动修复JSON")
|
||
return fixed
|
||
except Exception as exc:
|
||
logger.debug(f"{context_name} json_repair修复失败: {exc}")
|
||
|
||
return None
|
||
|
||
def _attempt_llm_repair(
|
||
self, text: str, error_msg: str, context_name: str
|
||
) -> Optional[str]:
|
||
"""
|
||
使用LLM进行JSON修复。
|
||
|
||
参数:
|
||
text: 原始JSON文本
|
||
error_msg: 解析错误信息
|
||
context_name: 上下文名称
|
||
|
||
返回:
|
||
Optional[str]: 修复后的JSON文本,失败返回None
|
||
"""
|
||
if not self.llm_repair_fn:
|
||
return None
|
||
|
||
try:
|
||
logger.info(f"{context_name} 尝试使用LLM修复JSON")
|
||
repaired = self.llm_repair_fn(text, error_msg)
|
||
if repaired and repaired != text:
|
||
return repaired
|
||
except Exception as exc:
|
||
logger.warning(f"{context_name} LLM修复失败: {exc}")
|
||
|
||
return None
|
||
|
||
def _extract_and_validate(
|
||
self,
|
||
data: Any,
|
||
expected_keys: Optional[List[str]],
|
||
extract_wrapper_key: Optional[str],
|
||
context_name: str,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
提取并验证JSON数据。
|
||
|
||
参数:
|
||
data: 解析后的数据
|
||
expected_keys: 期望的键列表
|
||
extract_wrapper_key: 包裹键名
|
||
context_name: 上下文名称
|
||
|
||
返回:
|
||
Dict[str, Any]: 提取并验证后的数据
|
||
|
||
异常:
|
||
JSONParseError: 如果数据格式不符合预期
|
||
"""
|
||
# 提取包裹的数据
|
||
if extract_wrapper_key and isinstance(data, dict):
|
||
if extract_wrapper_key in data:
|
||
data = data[extract_wrapper_key]
|
||
else:
|
||
logger.warning(
|
||
f"{context_name} 未找到包裹键'{extract_wrapper_key}',使用原始数据"
|
||
)
|
||
|
||
# 验证数据类型
|
||
if not isinstance(data, dict):
|
||
if isinstance(data, list):
|
||
if len(data) > 0:
|
||
# 尝试找到最符合期望的元素
|
||
best_match = None
|
||
max_match_count = 0
|
||
|
||
for item in data:
|
||
if isinstance(item, dict):
|
||
if expected_keys:
|
||
# 计算匹配的键数量
|
||
match_count = sum(1 for key in expected_keys if key in item)
|
||
if match_count > max_match_count:
|
||
max_match_count = match_count
|
||
best_match = item
|
||
elif best_match is None:
|
||
best_match = item
|
||
|
||
if best_match:
|
||
logger.warning(
|
||
f"{context_name} 返回数组,自动提取最佳匹配元素(匹配{max_match_count}/{len(expected_keys or [])}个键)"
|
||
)
|
||
data = best_match
|
||
else:
|
||
raise JSONParseError(
|
||
f"{context_name} 返回的数组中没有有效的对象"
|
||
)
|
||
else:
|
||
raise JSONParseError(f"{context_name} 返回空数组")
|
||
else:
|
||
raise JSONParseError(
|
||
f"{context_name} 返回的不是JSON对象: {type(data).__name__}"
|
||
)
|
||
|
||
# 验证必需的键
|
||
if expected_keys:
|
||
missing_keys = [key for key in expected_keys if key not in data]
|
||
if missing_keys:
|
||
logger.warning(
|
||
f"{context_name} 缺少预期的键: {', '.join(missing_keys)}"
|
||
)
|
||
# 尝试修复常见的键名变体
|
||
data = self._try_recover_missing_keys(data, missing_keys, context_name)
|
||
|
||
return data
|
||
|
||
def _try_recover_missing_keys(
|
||
self, data: Dict[str, Any], missing_keys: List[str], context_name: str
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
尝试从数据中恢复缺失的键,通过查找相似的键名。
|
||
|
||
参数:
|
||
data: 原始数据
|
||
missing_keys: 缺失的键列表
|
||
context_name: 上下文名称
|
||
|
||
返回:
|
||
Dict[str, Any]: 修复后的数据
|
||
"""
|
||
# 常见的键名映射
|
||
key_aliases = {
|
||
"template_name": ["templateName", "name", "template"],
|
||
"selection_reason": ["selectionReason", "reason", "explanation"],
|
||
"title": ["reportTitle", "documentTitle"],
|
||
"chapters": ["chapterList", "chapterPlan", "sections"],
|
||
"totalWords": ["total_words", "wordCount", "totalWordCount"],
|
||
}
|
||
|
||
for missing_key in missing_keys:
|
||
if missing_key in key_aliases:
|
||
for alias in key_aliases[missing_key]:
|
||
if alias in data:
|
||
logger.info(
|
||
f"{context_name} 找到键'{missing_key}'的别名'{alias}',自动映射"
|
||
)
|
||
data[missing_key] = data[alias]
|
||
break
|
||
|
||
return data
|
||
|
||
|
||
__all__ = ["RobustJSONParser", "JSONParseError"]
|