1. 修复论坛通信问题,基于日志块增加容错

2. 使用ERROR层级来避免json解析错误导致的连环问题
This commit is contained in:
Doiiars
2025-11-06 18:53:23 +08:00
parent 7d5105e563
commit e4d075cf56
13 changed files with 92 additions and 37 deletions
+56 -1
View File
@@ -70,6 +70,7 @@ class LogMonitor:
self.capturing_json = {} # 每个app的JSON捕获状态
self.json_buffer = {} # 每个app的JSON缓冲区
self.json_start_line = {} # 每个app的JSON开始行
self.in_error_block = {} # 每个app是否在ERROR块中
# 确保logs目录存在
self.log_dir.mkdir(exist_ok=True)
@@ -93,6 +94,7 @@ class LogMonitor:
self.capturing_json = {}
self.json_buffer = {}
self.json_start_line = {}
self.in_error_block = {}
# 重置主持人相关状态
self.agent_speeches_buffer = []
@@ -118,6 +120,21 @@ class LogMonitor:
except Exception as e:
logger.exception(f"ForumEngine: 写入forum.log失败: {e}")
def get_log_level(self, line: str) -> Optional[str]:
"""检测日志行的级别(INFO/ERROR/WARNING/DEBUG等)
支持loguru格式:YYYY-MM-DD HH:mm:ss.SSS | LEVEL | ...
Returns:
'INFO', 'ERROR', 'WARNING', 'DEBUG' 或 None(无法识别)
"""
# 检查loguru格式:YYYY-MM-DD HH:mm:ss.SSS | LEVEL | ...
# 匹配模式:| LEVEL | 或 | LEVEL |
match = re.search(r'\|\s*(INFO|ERROR|WARNING|DEBUG|TRACE|CRITICAL)\s*\|', line)
if match:
return match.group(1)
return None
def is_target_log_line(self, line: str) -> bool:
"""检查是否是目标日志行(SummaryNode
@@ -132,6 +149,11 @@ class LogMonitor:
- 包含错误关键词的日志(JSON解析失败、JSON修复失败等)
"""
# 排除 ERROR 级别的日志
log_level = self.get_log_level(line)
if log_level == 'ERROR':
return False
# 兼容旧检查方式
if "| ERROR" in line or "| ERROR |" in line:
return False
@@ -381,6 +403,7 @@ class LogMonitor:
# 重置JSON捕获状态
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
self.in_error_block[app_name] = False
if current_size > last_position:
with open(file_path, 'r', encoding='utf-8') as f:
@@ -400,17 +423,47 @@ class LogMonitor:
return new_lines
def process_lines_for_json(self, lines: List[str], app_name: str) -> List[str]:
"""处理行以捕获多行JSON内容"""
"""处理行以捕获多行JSON内容
实现ERROR块过滤:如果遇到ERROR级别的日志,拒绝处理直到遇到下一个INFO级别的日志
"""
captured_contents = []
# 初始化状态
if app_name not in self.capturing_json:
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
if app_name not in self.in_error_block:
self.in_error_block[app_name] = False
for line in lines:
if not line.strip():
continue
# 首先检查日志级别,更新ERROR块状态
log_level = self.get_log_level(line)
if log_level == 'ERROR':
# 遇到ERROR,进入ERROR块状态
self.in_error_block[app_name] = True
# 如果正在捕获JSON,立即停止并清空缓冲区
if self.capturing_json[app_name]:
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
# 跳过当前行,不处理
continue
elif log_level == 'INFO':
# 遇到INFO,退出ERROR块状态
self.in_error_block[app_name] = False
# 其他级别(WARNING、DEBUG等)保持当前状态
# 如果在ERROR块中,拒绝处理所有内容
if self.in_error_block[app_name]:
# 如果正在捕获JSON,立即停止并清空缓冲区
if self.capturing_json[app_name]:
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
# 跳过当前行,不处理
continue
# 检查是否是目标节点行和JSON开始标记
is_target = self.is_target_log_line(line)
@@ -538,6 +591,7 @@ class LogMonitor:
self.file_positions[app_name] = self.get_file_size(log_file)
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
self.in_error_block[app_name] = False
# logger.info(f"ForumEngine: {app_name} 基线行数: {self.file_line_counts[app_name]}")
while self.is_monitoring:
@@ -601,6 +655,7 @@ class LogMonitor:
# 重置JSON捕获状态
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
self.in_error_block[app_name] = False
# 更新行数记录
self.file_line_counts[app_name] = current_lines