SIGN IN SIGN UP
666ghj / BettaFish UNCLAIMED

微舆:人人可用的多Agent舆情分析助手,打破信息茧房,还原舆情原貌,预测未来走向,辅助决策!从0实现,不依赖任何框架。

39961 0 0 Python
"""
统一的JSON解析和修复工具。
提供鲁棒的JSON解析能力支持
1. 自动清理markdown代码块标记和思考内容
2. 本地语法修复(括号平衡、逗号补全、控制字符转义等)
3. 使用json_repair库进行高级修复
4. LLM辅助修复可选
5. 详细的错误日志和调试信息
"""
from __future__ import annotations
import json
import re
from typing import Any, Dict, List, Optional, Tuple, Callable
from loguru import logger
try:
from json_repair import repair_json as _json_repair_fn
except ImportError:
_json_repair_fn = None
class JSONParseError(ValueError):
"""JSON解析失败时抛出的异常附带原始文本方便排查。"""
def __init__(self, message: str, raw_text: Optional[str] = None):
"""
构造异常并附加原始输出,便于日志中定位。
Args:
message: 人类可读的错误描述。
raw_text: 触发异常的完整LLM输出。
"""
super().__init__(message)
self.raw_text = raw_text
class RobustJSONParser:
"""
鲁棒的JSON解析器。
集成多种修复策略确保LLM返回的内容能够被正确解析
- 清理markdown包裹、思考内容等额外信息
- 修复常见语法错误(缺少逗号、括号不平衡等)
- 转义未转义的控制字符
- 使用第三方库进行高级修复
- 可选的LLM辅助修复
"""
# 常见的LLM思考内容模式
_THINKING_PATTERNS = [
r"^\s*<thinking>.*?</thinking>\s*",
r"^\s*<thought>.*?</thought>\s*",
r"^\s*让我想想.*?(?=\{|\[|$)",
r"^\s*首先.*?(?=\{|\[|$)",
r"^\s*分析.*?(?=\{|\[|$)",
r"^\s*根据.*?(?=\{|\[|$)",
]
# 冒号等号模式LLM常见错误
_COLON_EQUALS_PATTERN = re.compile(r'(":\s*)=')
def __init__(
self,
llm_repair_fn: Optional[Callable[[str, str], Optional[str]]] = None,
enable_json_repair: bool = True,
enable_llm_repair: bool = False,
max_repair_attempts: int = 3,
):
"""
初始化JSON解析器。
Args:
llm_repair_fn: 可选的LLM修复函数接收(原始JSON, 错误信息)返回修复后的JSON
enable_json_repair: 是否启用json_repair库
enable_llm_repair: 是否启用LLM辅助修复
max_repair_attempts: 最大修复尝试次数
"""
self.llm_repair_fn = llm_repair_fn
self.enable_json_repair = enable_json_repair and _json_repair_fn is not None
self.enable_llm_repair = enable_llm_repair
self.max_repair_attempts = max_repair_attempts
def parse(
self,
raw_text: str,
context_name: str = "JSON",
expected_keys: Optional[List[str]] = None,
extract_wrapper_key: Optional[str] = None,
) -> Dict[str, Any]:
"""
解析LLM返回的JSON文本。
参数:
raw_text: LLM原始输出可能包含```包裹、思考内容等)
context_name: 上下文名称,用于错误信息
expected_keys: 期望的键列表,用于验证
extract_wrapper_key: 如果JSON被包裹在某个键中指定该键名进行提取
返回:
dict: 解析后的JSON对象
异常:
JSONParseError: 多种修复策略仍无法解析合法JSON
"""
if not raw_text or not raw_text.strip():
raise JSONParseError(f"{context_name}返回空内容")
2025-11-18 00:35:09 +08:00
# 原始文本用于后续日志
original_text = raw_text
2025-11-18 00:35:09 +08:00
# 步骤1: 构造候选集,包含不同清理策略
candidates = self._build_candidate_payloads(raw_text, context_name)
2025-11-18 00:35:09 +08:00
# 步骤2: 尝试解析所有候选
last_error: Optional[json.JSONDecodeError] = None
for i, candidate in enumerate(candidates):
try:
data = json.loads(candidate)
logger.debug(f"{context_name} JSON解析成功候选{i + 1}/{len(candidates)}")
return self._extract_and_validate(
data, expected_keys, extract_wrapper_key, context_name
)
except json.JSONDecodeError as exc:
last_error = exc
logger.debug(f"{context_name} 候选{i + 1}解析失败: {exc}")
2025-11-18 00:35:09 +08:00
cleaned = candidates[0] if candidates else original_text
# 步骤3: 使用json_repair库
if self.enable_json_repair:
repaired = self._attempt_json_repair(cleaned, context_name)
if repaired:
try:
data = json.loads(repaired)
logger.info(f"{context_name} JSON通过json_repair库修复成功")
return self._extract_and_validate(
data, expected_keys, extract_wrapper_key, context_name
)
except json.JSONDecodeError as exc:
last_error = exc
logger.debug(f"{context_name} json_repair修复后仍无法解析: {exc}")
2025-11-18 00:35:09 +08:00
# 步骤4: 使用LLM修复如果启用
if self.enable_llm_repair and self.llm_repair_fn:
llm_repaired = self._attempt_llm_repair(cleaned, str(last_error), context_name)
if llm_repaired:
try:
data = json.loads(llm_repaired)
logger.info(f"{context_name} JSON通过LLM修复成功")
return self._extract_and_validate(
data, expected_keys, extract_wrapper_key, context_name
)
except json.JSONDecodeError as exc:
last_error = exc
logger.warning(f"{context_name} LLM修复后仍无法解析: {exc}")
# 所有策略都失败了
error_msg = f"{context_name} JSON解析失败: {last_error}"
logger.error(error_msg)
2025-11-18 00:35:09 +08:00
logger.debug(f"原始文本前500字符: {original_text[:500]}")
raise JSONParseError(error_msg, raw_text=original_text) from last_error
def _build_candidate_payloads(self, raw_text: str, context_name: str) -> List[str]:
"""
针对原始文本构造多个候选JSON字符串覆盖不同的清理策略。
返回:
List[str]: 候选JSON文本列表
"""
cleaned = self._clean_response(raw_text)
candidates = [cleaned]
local_repaired = self._apply_local_repairs(cleaned)
if local_repaired != cleaned:
candidates.append(local_repaired)
# 对含有三层列表结构的内容强制拉平一次
flattened = self._flatten_nested_arrays(local_repaired)
if flattened not in candidates:
candidates.append(flattened)
return candidates
def _clean_response(self, raw: str) -> str:
"""
清理LLM响应去除markdown标记和思考内容。
参数:
raw: LLM原始输出
返回:
str: 清理后的文本
"""
cleaned = raw.strip()
# 移除思考内容(多语言支持)
for pattern in self._THINKING_PATTERNS:
cleaned = re.sub(pattern, "", cleaned, flags=re.DOTALL | re.IGNORECASE)
# 优先提取任意位置的```json```包裹内容
fenced_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", cleaned)
if fenced_match:
cleaned = fenced_match.group(1).strip()
else:
# 如果没有找到完整代码块,再尝试移除前后缀
if cleaned.startswith("```json"):
cleaned = cleaned[7:]
elif cleaned.startswith("```"):
cleaned = cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
# 尝试提取第一个完整的JSON对象或数组
cleaned = self._extract_first_json_structure(cleaned)
return cleaned
def _extract_first_json_structure(self, text: str) -> str:
"""
从文本中提取第一个完整的JSON对象或数组。
这对于处理LLM在JSON前后添加说明文字的情况很有用。
参数:
text: 可能包含JSON的文本
返回:
str: 提取的JSON文本如果找不到则返回原文本
"""
# 查找第一个 { 或 [
start_brace = text.find("{")
start_bracket = text.find("[")
if start_brace == -1 and start_bracket == -1:
return text
# 确定起始位置
if start_brace == -1:
start = start_bracket
opener = "["
closer = "]"
elif start_bracket == -1:
start = start_brace
opener = "{"
closer = "}"
else:
start = min(start_brace, start_bracket)
opener = text[start]
closer = "}" if opener == "{" else "]"
# 查找对应的结束位置
depth = 0
in_string = False
escaped = False
for i in range(start, len(text)):
ch = text[i]
if escaped:
escaped = False
continue
if ch == "\\":
escaped = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch in "{[":
depth += 1
elif ch in "}]":
depth -= 1
if depth == 0:
return text[start : i + 1]
# 如果没找到完整的结构,返回从起始位置到结尾
return text[start:] if start < len(text) else text
def _apply_local_repairs(self, text: str) -> str:
"""
应用本地修复策略。
参数:
text: 原始JSON文本
返回:
str: 修复后的文本
"""
repaired = text
mutated = False
# 修复 ":=" 错误
new_text = self._COLON_EQUALS_PATTERN.sub(r"\1", repaired)
if new_text != repaired:
logger.warning("检测到\":=\"字符,已自动移除多余的'='")
repaired = new_text
mutated = True
# 转义控制字符
repaired, escaped = self._escape_control_characters(repaired)
if escaped:
logger.warning("检测到未转义的控制字符,已自动转换为转义序列")
mutated = True
# 修复缺少的逗号
repaired, commas_fixed = self._fix_missing_commas(repaired)
if commas_fixed:
logger.warning("检测到对象/数组之间缺少逗号,已自动补齐")
mutated = True
2025-11-18 00:35:09 +08:00
# 合并多余的方括号LLM常见把二维列表层级写成三层
repaired, brackets_collapsed = self._collapse_redundant_brackets(repaired)
if brackets_collapsed:
logger.warning("检测到连续的方括号嵌套,已尝试折叠为二维结构")
mutated = True
# 平衡括号
repaired, balanced = self._balance_brackets(repaired)
if balanced:
logger.warning("检测到括号不平衡,已自动补齐/剔除异常括号")
mutated = True
# 移除尾随逗号
repaired, trailing_removed = self._remove_trailing_commas(repaired)
if trailing_removed:
logger.warning("检测到尾随逗号,已自动移除")
mutated = True
return repaired if mutated else text
def _escape_control_characters(self, text: str) -> Tuple[str, bool]:
"""
将字符串字面量中的裸换行/制表符/控制字符替换为JSON合法的转义序列。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
result: List[str] = []
in_string = False
escaped = False
mutated = False
control_map = {"\n": "\\n", "\r": "\\r", "\t": "\\t"}
for ch in text:
if escaped:
result.append(ch)
escaped = False
continue
if ch == "\\":
result.append(ch)
escaped = True
continue
if ch == '"':
result.append(ch)
in_string = not in_string
continue
if in_string and ch in control_map:
result.append(control_map[ch])
mutated = True
continue
if in_string and ord(ch) < 0x20:
result.append(f"\\u{ord(ch):04x}")
mutated = True
continue
result.append(ch)
return "".join(result), mutated
def _fix_missing_commas(self, text: str) -> Tuple[str, bool]:
"""
在对象/数组元素之间自动补逗号。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
chars: List[str] = []
mutated = False
in_string = False
escaped = False
length = len(text)
i = 0
while i < length:
ch = text[i]
chars.append(ch)
if escaped:
escaped = False
i += 1
continue
if ch == "\\":
escaped = True
i += 1
continue
if ch == '"':
# 如果我们正在退出字符串,检查后面是否需要逗号
if in_string:
# 查找下一个非空白字符
j = i + 1
while j < length and text[j] in " \t\r\n":
j += 1
# 如果下一个字符是 " { [ 或数字,可能需要逗号
if j < length:
next_ch = text[j]
if next_ch in "\"[{" or next_ch.isdigit():
# 检查是否已经在对象或数组中
# 通过检查前面是否有未闭合的 { 或 [
has_opener = False
for k in range(len(chars) - 1, -1, -1):
if chars[k] in "{[":
has_opener = True
break
elif chars[k] in "]}":
break
if has_opener:
chars.append(",")
mutated = True
in_string = not in_string
i += 1
continue
# 在 } 或 ] 后面检查是否需要逗号
if not in_string and ch in "}]":
j = i + 1
# 跳过空白
while j < length and text[j] in " \t\r\n":
j += 1
# 如果下一个非空白字符是 { [ " 或数字,添加逗号
if j < length:
next_ch = text[j]
if next_ch in "{[\"" or next_ch.isdigit():
chars.append(",")
mutated = True
i += 1
return "".join(chars), mutated
2025-11-18 00:35:09 +08:00
def _collapse_redundant_brackets(self, text: str) -> Tuple[str, bool]:
"""
针对LLM生成的三层或更多层数组如]]], [[ / [[[)进行折叠,避免表格/列表写出额外维度。
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
mutated = False
patterns = [
# 典型错误: "]]], [[{...}" -> "]], [{...}"
(re.compile(r"\]\s*\]\s*\]\s*,\s*\[\s*\["), "]],["),
# 极端情况: 连续三层开头 "[[[" -> "[["
(re.compile(r"\[\s*\[\s*\["), "[["),
# 极端情况: 结尾 "]]]" -> "]]"
(re.compile(r"\]\s*\]\s*\]"), "]]"),
]
repaired = text
for pattern, replacement in patterns:
new_text, count = pattern.subn(replacement, repaired)
if count > 0:
mutated = True
repaired = new_text
return repaired, mutated
def _flatten_nested_arrays(self, text: str) -> str:
"""
对明显多余的一层列表进行折叠,例如 [[[x]]] -> [[x]]。
"""
if not text:
return text
text = re.sub(r"\]\s*\]\s*\]", "]]", text)
text = re.sub(r"\[\s*\[\s*\[", "[[", text)
return text
def _balance_brackets(self, text: str) -> Tuple[str, bool]:
"""
尝试修复因LLM多写/少写括号导致的不平衡结构。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
result: List[str] = []
stack: List[str] = []
mutated = False
in_string = False
escaped = False
opener_map = {"{": "}", "[": "]"}
for ch in text:
if escaped:
result.append(ch)
escaped = False
continue
if ch == "\\":
result.append(ch)
escaped = True
continue
if ch == '"':
result.append(ch)
in_string = not in_string
continue
if in_string:
result.append(ch)
continue
if ch in "{[":
stack.append(ch)
result.append(ch)
continue
if ch in "}]":
if stack and (
(ch == "}" and stack[-1] == "{") or (ch == "]" and stack[-1] == "[")
):
stack.pop()
result.append(ch)
else:
# 不匹配的闭括号,忽略
mutated = True
continue
result.append(ch)
# 补齐未闭合的括号
while stack:
opener = stack.pop()
result.append(opener_map[opener])
mutated = True
return "".join(result), mutated
def _remove_trailing_commas(self, text: str) -> Tuple[str, bool]:
"""
移除JSON对象和数组中的尾随逗号。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
# 使用正则表达式移除尾随逗号
# 匹配 , 后面跟着空白和 } 或 ] 的情况
pattern = r",(\s*[}\]])"
new_text = re.sub(pattern, r"\1", text)
return new_text, new_text != text
def _attempt_json_repair(self, text: str, context_name: str) -> Optional[str]:
"""
使用json_repair库进行高级修复。
参数:
text: 原始JSON文本
context_name: 上下文名称
返回:
Optional[str]: 修复后的JSON文本失败返回None
"""
if not _json_repair_fn:
return None
try:
fixed = _json_repair_fn(text)
if fixed and fixed != text:
logger.info(f"{context_name} 使用json_repair库自动修复JSON")
return fixed
except Exception as exc:
logger.debug(f"{context_name} json_repair修复失败: {exc}")
return None
def _attempt_llm_repair(
self, text: str, error_msg: str, context_name: str
) -> Optional[str]:
"""
使用LLM进行JSON修复。
参数:
text: 原始JSON文本
error_msg: 解析错误信息
context_name: 上下文名称
返回:
Optional[str]: 修复后的JSON文本失败返回None
"""
if not self.llm_repair_fn:
return None
try:
logger.info(f"{context_name} 尝试使用LLM修复JSON")
repaired = self.llm_repair_fn(text, error_msg)
if repaired and repaired != text:
return repaired
except Exception as exc:
logger.warning(f"{context_name} LLM修复失败: {exc}")
return None
def _extract_and_validate(
self,
data: Any,
expected_keys: Optional[List[str]],
extract_wrapper_key: Optional[str],
context_name: str,
) -> Dict[str, Any]:
"""
提取并验证JSON数据。
参数:
data: 解析后的数据
expected_keys: 期望的键列表
extract_wrapper_key: 包裹键名
context_name: 上下文名称
返回:
Dict[str, Any]: 提取并验证后的数据
异常:
JSONParseError: 如果数据格式不符合预期
"""
# 提取包裹的数据
if extract_wrapper_key and isinstance(data, dict):
if extract_wrapper_key in data:
data = data[extract_wrapper_key]
else:
logger.warning(
f"{context_name} 未找到包裹键'{extract_wrapper_key}',使用原始数据"
)
# 验证数据类型
if not isinstance(data, dict):
2025-11-17 17:36:58 +08:00
if isinstance(data, list):
if len(data) > 0:
# 尝试找到最符合期望的元素
best_match = None
max_match_count = 0
for item in data:
if isinstance(item, dict):
if expected_keys:
# 计算匹配的键数量
match_count = sum(1 for key in expected_keys if key in item)
if match_count > max_match_count:
max_match_count = match_count
best_match = item
elif best_match is None:
best_match = item
if best_match:
logger.warning(
f"{context_name} 返回数组,自动提取最佳匹配元素(匹配{max_match_count}/{len(expected_keys or [])}个键)"
)
data = best_match
else:
raise JSONParseError(
f"{context_name} 返回的数组中没有有效的对象"
)
else:
raise JSONParseError(f"{context_name} 返回空数组")
else:
raise JSONParseError(
f"{context_name} 返回的不是JSON对象: {type(data).__name__}"
)
# 验证必需的键
if expected_keys:
missing_keys = [key for key in expected_keys if key not in data]
if missing_keys:
logger.warning(
f"{context_name} 缺少预期的键: {', '.join(missing_keys)}"
)
2025-11-17 17:36:58 +08:00
# 尝试修复常见的键名变体
data = self._try_recover_missing_keys(data, missing_keys, context_name)
return data
def _try_recover_missing_keys(
self, data: Dict[str, Any], missing_keys: List[str], context_name: str
) -> Dict[str, Any]:
"""
尝试从数据中恢复缺失的键,通过查找相似的键名。
参数:
data: 原始数据
missing_keys: 缺失的键列表
context_name: 上下文名称
返回:
Dict[str, Any]: 修复后的数据
"""
# 常见的键名映射
key_aliases = {
"template_name": ["templateName", "name", "template"],
"selection_reason": ["selectionReason", "reason", "explanation"],
"title": ["reportTitle", "documentTitle"],
"chapters": ["chapterList", "chapterPlan", "sections"],
"totalWords": ["total_words", "wordCount", "totalWordCount"],
}
for missing_key in missing_keys:
if missing_key in key_aliases:
for alias in key_aliases[missing_key]:
if alias in data:
logger.info(
f"{context_name} 找到键'{missing_key}'的别名'{alias}',自动映射"
)
data[missing_key] = data[alias]
break
return data
__all__ = ["RobustJSONParser", "JSONParseError"]