1. 修复论坛通信问题

2. 修复总结报告错误
3. 修复环境变量重新载入问题
4. 添加测试用例
5. 修复论坛主持人问题
This commit is contained in:
Doiiars
2025-11-06 13:57:56 +08:00
parent adeedff98a
commit dce6371410
9 changed files with 331 additions and 82 deletions
+7 -7
View File
@@ -12,7 +12,7 @@ import re
# 添加项目根目录到Python路径以导入config
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import FORUM_HOST_API_KEY, FORUM_HOST_BASE_URL, FORUM_HOST_MODEL_NAME
from config import settings
# 添加utils目录到Python路径
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -35,21 +35,21 @@ class ForumHost:
初始化论坛主持人
Args:
api_key: 硅基流动API密钥,如果不提供则从配置文件读取
base_url: 接口基础地址,默认使用配置文件提供的SiliconFlow地址
api_key: 论坛主持人 LLM API 密钥,如果不提供则从配置文件读取
base_url: 论坛主持人 LLM API 接口基础地址,默认使用配置文件提供的SiliconFlow地址
"""
self.api_key = api_key or FORUM_HOST_API_KEY
self.api_key = api_key or settings.FORUM_HOST_API_KEY
if not self.api_key:
raise ValueError("未找到硅基流动API密钥,请在config.py中设置FORUM_HOST_API_KEY")
raise ValueError("未找到论坛主持人API密钥,请在环境变量文件中设置FORUM_HOST_API_KEY")
self.base_url = base_url or FORUM_HOST_BASE_URL
self.base_url = base_url or settings.FORUM_HOST_BASE_URL
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
self.model = model_name or FORUM_HOST_MODEL_NAME # Use configured model
self.model = model_name or settings.FORUM_HOST_MODEL_NAME # Use configured model
# Track previous summaries to avoid duplicates
self.previous_summaries = []
+74 -39
View File
@@ -18,7 +18,7 @@ try:
from .llm_host import generate_host_speech
HOST_AVAILABLE = True
except ImportError:
logger.warning("ForumEngine: 论坛主持人模块未找到,将以纯监控模式运行")
logger.exception("ForumEngine: 论坛主持人模块未找到,将以纯监控模式运行")
HOST_AVAILABLE = False
class LogMonitor:
@@ -50,10 +50,20 @@ class LogMonitor:
self.host_speech_threshold = 5 # 每5条agent发言触发一次主持人发言
self.is_host_generating = False # 主持人是否正在生成发言
# 目标节点名称 - 直接匹配字符串
self.target_nodes = [
'FirstSummaryNode',
'ReflectionSummaryNode'
# 目标节点识别模式
# 1. 类名(旧格式可能包含)
# 2. 完整模块路径(实际日志格式,包含引擎前缀)
# 3. 部分模块路径(兼容性)
# 4. 关键标识文本
self.target_node_patterns = [
'FirstSummaryNode', # 类名
'ReflectionSummaryNode', # 类名
'InsightEngine.nodes.summary_node', # InsightEngine完整路径
'MediaEngine.nodes.summary_node', # MediaEngine完整路径
'QueryEngine.nodes.summary_node', # QueryEngine完整路径
'nodes.summary_node', # 模块路径(兼容性,用于部分匹配)
'正在生成首次段落总结', # FirstSummaryNode的标识
'正在生成反思总结', # ReflectionSummaryNode的标识
]
# 多行内容捕获状态
@@ -107,12 +117,33 @@ class LogMonitor:
f.flush()
except Exception as e:
logger.exception(f"ForumEngine: 写入forum.log失败: {e}")
def is_target_log_line(self, line: str) -> bool:
"""检查是否是目标日志行(SummaryNode"""
# 简单字符串包含检查,更可靠
for node_name in self.target_nodes:
if node_name in line:
"""检查是否是目标日志行(SummaryNode
支持多种识别方式:
1. 类名:FirstSummaryNode, ReflectionSummaryNode
2. 完整模块路径:InsightEngine.nodes.summary_node、MediaEngine.nodes.summary_node、QueryEngine.nodes.summary_node
3. 部分模块路径:nodes.summary_node(兼容性)
4. 关键标识文本:正在生成首次段落总结、正在生成反思总结
排除条件:
- ERROR 级别的日志(错误日志不应被识别为目标节点)
- 包含错误关键词的日志(JSON解析失败、JSON修复失败等)
"""
# 排除 ERROR 级别的日志
if "| ERROR" in line or "| ERROR |" in line:
return False
# 排除包含错误关键词的日志
error_keywords = ["JSON解析失败", "JSON修复失败", "Traceback", "File \""]
for keyword in error_keywords:
if keyword in line:
return False
# 检查是否包含目标节点模式
for pattern in self.target_node_patterns:
if pattern in line:
return True
return False
@@ -381,32 +412,33 @@ class LogMonitor:
if not line.strip():
continue
# 检查是否是目标节点行或包含JSON开始标记的行
# 检查是否是目标节点行JSON开始标记
is_target = self.is_target_log_line(line)
is_json_start = self.is_json_start_line(line)
if is_target or is_json_start:
if is_json_start:
# 开始捕获JSON(即使不是目标节点,只要包含"清理后的输出: {"就处理)
self.capturing_json[app_name] = True
self.json_buffer[app_name] = [line]
self.json_start_line[app_name] = line
# 只有目标节点(SummaryNode)的JSON输出才应该被捕获
# 过滤掉SearchNode等其他节点的输出(它们不是目标节点,即使有JSON也不会被捕获)
if is_target and is_json_start:
# 开始捕获JSON(必须是目标节点且包含"清理后的输出: {"
self.capturing_json[app_name] = True
self.json_buffer[app_name] = [line]
self.json_start_line[app_name] = line
# 检查是否是单行JSON
if line.strip().endswith("}"):
# 单行JSON,立即处理
content = self.extract_json_content([line])
if content: # 只有成功解析的内容才会被记录
# 去除重复的标签和格式化
clean_content = self._clean_content_tags(content, app_name)
captured_contents.append(f"{clean_content}")
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
# 检查是否是单行JSON
if line.strip().endswith("}"):
# 单行JSON,立即处理
content = self.extract_json_content([line])
if content: # 只有成功解析的内容才会被记录
# 去除重复的标签和格式化
clean_content = self._clean_content_tags(content, app_name)
captured_contents.append(f"{clean_content}")
self.capturing_json[app_name] = False
self.json_buffer[app_name] = []
elif is_target and self.is_valuable_content(line):
# 其他有价值的SummaryNode内容(必须是目标节点且有价值)
clean_content = self._clean_content_tags(self.extract_node_content(line), app_name)
captured_contents.append(f"{clean_content}")
elif is_target and self.is_valuable_content(line):
# 其他有价值的SummaryNode内容(必须是目标节点且有价值)
clean_content = self._clean_content_tags(self.extract_node_content(line), app_name)
captured_contents.append(f"{clean_content}")
elif self.capturing_json[app_name]:
# 正在捕获JSON的后续行
@@ -528,13 +560,16 @@ class LogMonitor:
# 先检查是否需要触发搜索(只触发一次)
if not self.is_searching:
for line in new_lines:
if line.strip() and 'FirstSummaryNode' in line:
logger.info(f"ForumEngine: 在{app_name}中检测到第一次论坛发表内容")
self.is_searching = True
self.search_inactive_count = 0
# 清空forum.log开始新会话
self.clear_forum_log()
break # 找到一个就够了,跳出循环
# 检查是否包含目标节点模式(支持多种格式)
if line.strip() and self.is_target_log_line(line):
# 进一步确认是首次总结节点(FirstSummaryNode或包含"正在生成首次段落总结")
if 'FirstSummaryNode' in line or '正在生成首次段落总结' in line:
logger.info(f"ForumEngine: 在{app_name}中检测到第一次论坛发表内容")
self.is_searching = True
self.search_inactive_count = 0
# 清空forum.log开始新会话
self.clear_forum_log()
break # 找到一个就够了,跳出循环
# 处理所有新增内容(如果正在搜索状态)
if self.is_searching:
+5
View File
@@ -161,6 +161,11 @@ class KeywordOptimizer:
**重要提醒**:每个关键词都必须是一个不可分割的独立词条,严禁在词条内部包含空格。例如,应使用 "雷军班争议" 而不是错误的 "雷军班 争议"
**主题相关**:关键词要与初始查询主题相关,不要偏离主题
- 保留主体词("武汉天气""武汉"✓)
- 避免泛化属性("天气"✗ 会匹配全国)
- 专有名词可拆("武汉大学""武大""大学"✓)
**输出格式**
请以JSON格式返回结果:
{
+1 -1
View File
@@ -195,7 +195,7 @@ class ReportAgent:
start_time = datetime.now()
logger.info(f"开始生成报告: {query}")
self.logger.info(f"输入数据 - 报告数量: {len(reports)}, 论坛日志长度: {len(forum_logs)}")
logger.info(f"输入数据 - 报告数量: {len(reports)}, 论坛日志长度: {len(forum_logs)}")
try:
# Step 1: 模板选择
+8
View File
@@ -134,6 +134,7 @@ def run_report_generation(task: ReportTask, query: str, custom_template: str = "
task.update_status("completed", 100)
except Exception as e:
logger.exception(f"报告生成过程中发生错误: {str(e)}")
task.update_status("error", 0, str(e))
# 只在出错时清理任务
with task_lock:
@@ -156,6 +157,7 @@ def get_status():
'current_task': current_task.to_dict() if current_task else None
})
except Exception as e:
logger.exception(f"获取Report Engine状态失败: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
@@ -228,6 +230,7 @@ def generate_report():
})
except Exception as e:
logger.exception(f"开始生成报告失败: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
@@ -319,6 +322,7 @@ def get_result_json(task_id: str):
})
except Exception as e:
logger.exception(f"获取报告生成结果失败: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
@@ -348,6 +352,7 @@ def cancel_task(task_id: str):
}), 404
except Exception as e:
logger.exception(f"取消报告生成任务失败: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
@@ -391,6 +396,7 @@ def get_templates():
})
except Exception as e:
logger.exception(f"获取可用模板列表失败: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
@@ -400,6 +406,7 @@ def get_templates():
# 错误处理
@report_bp.errorhandler(404)
def not_found(error):
logger.exception(f"API端点不存在: {str(error)}")
return jsonify({
'success': False,
'error': 'API端点不存在'
@@ -408,6 +415,7 @@ def not_found(error):
@report_bp.errorhandler(500)
def internal_error(error):
logger.exception(f"服务器内部错误: {str(error)}")
return jsonify({
'success': False,
'error': '服务器内部错误'
+3 -13
View File
@@ -94,20 +94,10 @@ def _load_config_module():
def read_config_values():
"""Return the current configuration values that are exposed to the frontend."""
try:
# 重新导入 config 模块以获取最新的 Settings 实例
importlib.invalidate_caches()
if CONFIG_MODULE_NAME in sys.modules:
importlib.reload(sys.modules[CONFIG_MODULE_NAME])
else:
importlib.import_module(CONFIG_MODULE_NAME)
# 重新加载配置以获取最新的 Settings 实例
from config import reload_settings, settings
reload_settings()
# 从 config 模块获取 settings 实例
config_module = sys.modules[CONFIG_MODULE_NAME]
if not hasattr(config_module, 'settings'):
logger.error("config 模块中没有找到 settings 实例")
return {}
settings = config_module.settings
values = {}
for key in CONFIG_KEYS:
# 从 Pydantic Settings 实例读取值
+24 -6
View File
@@ -9,8 +9,9 @@
from pathlib import Path
from pydantic_settings import BaseSettings
from pydantic import Field
from pydantic import Field, ConfigDict
from typing import Optional
from loguru import logger
# 计算 .env 优先级:优先当前工作目录,其次项目根目录
@@ -86,12 +87,29 @@ class Settings(BaseSettings):
SEARCH_TIMEOUT: int = Field(240, description="单次搜索请求超时")
MAX_CONTENT_LENGTH: int = Field(500000, description="搜索最大内容长度")
class Config:
env_file = ENV_FILE
env_prefix = ""
case_sensitive = False
extra = "allow"
model_config = ConfigDict(
env_file=ENV_FILE,
env_prefix="",
case_sensitive=False,
extra="allow"
)
# 创建全局配置实例
settings = Settings()
def reload_settings() -> Settings:
"""
重新加载配置
.env 文件和环境变量重新加载配置更新全局 settings 实例
用于在运行时动态更新配置
Returns:
Settings: 新创建的配置实例
"""
global settings
settings = Settings()
return settings
File diff suppressed because one or more lines are too long
+158 -16
View File
@@ -4,6 +4,7 @@
测试各种日志格式下的解析能力包括
1. 旧格式[HH:MM:SS]
2. 新格式loguru默认格式 (YYYY-MM-DD HH:mm:ss.SSS | LEVEL | ...)
3. 只应当接收FirstSummaryNodeReflectionSummaryNode等SummaryNode的输出不应当接收SearchNode的输出
"""
import sys
@@ -83,12 +84,11 @@ class TestLogMonitor:
assert "JSON内容" in result
def test_extract_json_content_new_format_multiline(self):
"""测试新格式多行JSON提取(关键测试:需要支持loguru格式的时间戳移除)"""
"""测试新格式多行JSON提取(支持loguru格式的时间戳移除)"""
result = self.monitor.extract_json_content(test_data.NEW_FORMAT_MULTILINE_JSON)
# 注意:当前代码中的时间戳移除正则只支持 [HH:MM:SS] 格式
# 这个测试可能会失败,直到修复了时间戳移除逻辑
# 如果失败,说明需要修改 extract_json_content 中的时间戳移除逻辑
assert result is not None or True # 暂时允许失败,用于发现问题
assert result is not None
assert "多行" in result
assert "JSON内容" in result
def test_extract_json_content_updated_priority(self):
"""测试updated_paragraph_latest_state优先提取"""
@@ -133,13 +133,11 @@ class TestLogMonitor:
assert "测试内容" in result
def test_extract_node_content_new_format(self):
"""测试新格式的节点内容提取(关键测试)"""
"""测试新格式的节点内容提取"""
line = "2025-11-05 17:42:31.287 | INFO | InsightEngine.nodes.summary_node:process_output:131 - FirstSummaryNode 清理后的输出: 这是测试内容"
result = self.monitor.extract_node_content(line)
# 注意:当前代码中的正则只支持 [HH:MM:SS] 格式
# 这个测试可能会失败,直到修复了时间戳匹配逻辑
# 如果失败,说明需要修改 extract_node_content 中的时间戳匹配逻辑
assert result is not None or True # 暂时允许失败,用于发现问题
assert result is not None
assert "测试内容" in result
def test_process_lines_for_json_old_format(self):
"""测试旧格式的完整处理流程"""
@@ -154,7 +152,7 @@ class TestLogMonitor:
assert any("多行" in content for content in result)
def test_process_lines_for_json_new_format(self):
"""测试新格式的完整处理流程(关键测试)"""
"""测试新格式的完整处理流程"""
lines = [
test_data.NEW_FORMAT_NON_TARGET, # 应该被忽略
test_data.NEW_FORMAT_MULTILINE_JSON[0],
@@ -162,15 +160,15 @@ class TestLogMonitor:
test_data.NEW_FORMAT_MULTILINE_JSON[2],
]
result = self.monitor.process_lines_for_json(lines, "insight")
# 注意:这个测试可能会失败,因为当前代码可能无法正确处理新格式
# 如果失败,说明需要修改 process_lines_for_json 和相关函数
assert len(result) > 0 or True # 暂时允许失败,用于发现问题
assert len(result) > 0
assert any("多行" in content for content in result)
assert any("JSON内容" in content for content in result)
def test_process_lines_for_json_mixed_format(self):
"""测试混合格式的处理"""
result = self.monitor.process_lines_for_json(test_data.MIXED_FORMAT_LINES, "insight")
# 混合格式应该也能处理
assert len(result) > 0 or True # 暂时允许失败,用于发现问题
assert len(result) > 0
assert any("混合格式内容" in content for content in result)
def test_is_valuable_content(self):
"""测试有价值内容的判断"""
@@ -183,6 +181,150 @@ class TestLogMonitor:
# 空行应该被过滤
assert self.monitor.is_valuable_content("") == False
def test_extract_json_content_real_query_engine(self):
"""测试QueryEngine实际生产环境日志提取"""
result = self.monitor.extract_json_content(test_data.REAL_QUERY_ENGINE_REFLECTION)
assert result is not None
assert "洛阳栾川钼业集团" in result
assert "CMOC" in result
assert "updated_paragraph_latest_state" not in result # 应该已经提取内容,不包含字段名
def test_extract_json_content_real_insight_engine(self):
"""测试InsightEngine实际生产环境日志提取(包含标识行)"""
# 先测试能否识别标识行
assert self.monitor.is_target_log_line(test_data.REAL_INSIGHT_ENGINE_REFLECTION[0]) == True # 包含"正在生成反思总结"
assert self.monitor.is_target_log_line(test_data.REAL_INSIGHT_ENGINE_REFLECTION[1]) == True # 包含nodes.summary_node
# 测试JSON提取(从第二行开始,因为第一行是标识行)
json_lines = test_data.REAL_INSIGHT_ENGINE_REFLECTION[1:] # 跳过标识行
result = self.monitor.extract_json_content(json_lines)
assert result is not None
assert "核心发现" in result
assert "更新版" in result
assert "洛阳钼业2025年第三季度" in result
def test_extract_json_content_real_media_engine(self):
"""测试MediaEngine实际生产环境日志提取(单行JSON)"""
# MediaEngine是单行JSON格式,需要先分割成行
lines = test_data.REAL_MEDIA_ENGINE_REFLECTION.split('\n')
# 测试能否识别标识行
assert self.monitor.is_target_log_line(lines[0]) == True # 包含"正在生成反思总结"
assert self.monitor.is_target_log_line(lines[1]) == True # 包含nodes.summary_node和"清理后的输出"
# 测试JSON提取(从包含JSON的行开始)
json_line = lines[1] # 第二行包含完整的单行JSON
result = self.monitor.extract_json_content([json_line])
assert result is not None
assert "综合信息概览" in result
assert "洛阳钼业" in result
assert "updated_paragraph_latest_state" not in result # 应该已经提取内容
def test_process_lines_for_json_real_query_engine(self):
"""测试QueryEngine实际日志的完整处理流程"""
result = self.monitor.process_lines_for_json(test_data.REAL_QUERY_ENGINE_REFLECTION, "query")
assert len(result) > 0
assert any("洛阳栾川钼业集团" in content for content in result)
def test_process_lines_for_json_real_insight_engine(self):
"""测试InsightEngine实际日志的完整处理流程(包含标识行)"""
result = self.monitor.process_lines_for_json(test_data.REAL_INSIGHT_ENGINE_REFLECTION, "insight")
assert len(result) > 0
assert any("核心发现" in content for content in result)
assert any("更新版" in content for content in result)
def test_process_lines_for_json_real_media_engine(self):
"""测试MediaEngine实际日志的完整处理流程(单行JSON)"""
# 将单行字符串分割成多行
lines = test_data.REAL_MEDIA_ENGINE_REFLECTION.split('\n')
result = self.monitor.process_lines_for_json(lines, "media")
assert len(result) > 0
assert any("综合信息概览" in content for content in result)
assert any("洛阳钼业" in content for content in result)
def test_filter_search_node_output(self):
"""测试过滤SearchNode的输出(重要:SearchNode不应进入论坛)"""
# SearchNode的输出包含"清理后的输出: {",但不包含目标节点模式
search_lines = test_data.SEARCH_NODE_FIRST_SEARCH
result = self.monitor.process_lines_for_json(search_lines, "insight")
# SearchNode的输出应该被过滤,不应该被捕获
assert len(result) == 0
def test_filter_search_node_output_single_line(self):
"""测试过滤SearchNode的单行JSON输出"""
# SearchNode的单行JSON格式
search_line = test_data.SEARCH_NODE_REFLECTION_SEARCH
result = self.monitor.process_lines_for_json([search_line], "insight")
# SearchNode的输出应该被过滤
assert len(result) == 0
def test_search_node_vs_summary_node_mixed(self):
"""测试混合场景:SearchNode和SummaryNode同时存在,只捕获SummaryNode"""
lines = [
# SearchNode输出(应该被过滤)
"[11:16:35] 2025-11-06 11:16:35.567 | INFO | InsightEngine.nodes.search_node:process_output:97 - 清理后的输出: {",
"[11:16:35] \"search_query\": \"测试查询\"",
"[11:16:35] }",
# SummaryNode输出(应该被捕获)
"[11:17:05] 2025-11-06 11:17:05.547 | INFO | InsightEngine.nodes.summary_node:process_output:131 - 清理后的输出: {",
"[11:17:05] \"paragraph_latest_state\": \"这是总结内容\"",
"[11:17:05] }",
]
result = self.monitor.process_lines_for_json(lines, "insight")
# 应该只捕获SummaryNode的输出,不包含SearchNode的输出
assert len(result) > 0
assert any("总结内容" in content for content in result)
# 确保不包含搜索查询内容
assert not any("search_query" in content for content in result)
assert not any("测试查询" in content for content in result)
def test_filter_error_logs_from_summary_node(self):
"""测试过滤SummaryNode的错误日志(重要:错误日志不应进入论坛)"""
# JSON解析失败错误日志
assert self.monitor.is_target_log_line(test_data.SUMMARY_NODE_JSON_ERROR) == False
# JSON修复失败错误日志
assert self.monitor.is_target_log_line(test_data.SUMMARY_NODE_JSON_FIX_ERROR) == False
# ERROR级别日志
assert self.monitor.is_target_log_line(test_data.SUMMARY_NODE_ERROR_LOG) == False
# Traceback错误日志
for line in test_data.SUMMARY_NODE_TRACEBACK.split('\n'):
assert self.monitor.is_target_log_line(line) == False
def test_error_logs_not_captured(self):
"""测试错误日志不会被捕获到论坛"""
error_lines = [
test_data.SUMMARY_NODE_JSON_ERROR,
test_data.SUMMARY_NODE_JSON_FIX_ERROR,
test_data.SUMMARY_NODE_ERROR_LOG,
]
for line in error_lines:
result = self.monitor.process_lines_for_json([line], "media")
# 错误日志不应该被捕获
assert len(result) == 0
def test_mixed_valid_and_error_logs(self):
"""测试混合场景:有效日志和错误日志同时存在,只捕获有效日志"""
lines = [
# 错误日志(应该被过滤)
test_data.SUMMARY_NODE_JSON_ERROR,
test_data.SUMMARY_NODE_JSON_FIX_ERROR,
# 有效SummaryNode输出(应该被捕获)
"[11:55:31] 2025-11-06 11:55:31.762 | INFO | MediaEngine.nodes.summary_node:process_output:134 - 清理后的输出: {",
"[11:55:31] \"paragraph_latest_state\": \"这是有效的总结内容\"",
"[11:55:31] }",
]
result = self.monitor.process_lines_for_json(lines, "media")
# 应该只捕获有效日志,不包含错误日志
assert len(result) > 0
assert any("有效的总结内容" in content for content in result)
# 确保不包含错误信息
assert not any("JSON解析失败" in content for content in result)
assert not any("JSON修复失败" in content for content in result)
def run_tests():