Updating LLM-host for ForumEngine
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
"""
|
||||
ForumEgine - 监控和记录三个Engine的SummaryNode和ReportFormattingNode输出
|
||||
"""
|
||||
|
||||
from .monitor import LogMonitor
|
||||
|
||||
__all__ = ['LogMonitor']
|
||||
@@ -0,0 +1,337 @@
|
||||
"""
|
||||
论坛主持人模块
|
||||
使用硅基流动的Qwen3模型作为论坛主持人,引导多个agent进行讨论
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
from typing import List, Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
import re
|
||||
|
||||
# 添加项目根目录到Python路径以导入config
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from config import GUIJI_QWEN3_API_KEY
|
||||
|
||||
# 添加utils目录到Python路径
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
root_dir = os.path.dirname(current_dir)
|
||||
utils_dir = os.path.join(root_dir, 'utils')
|
||||
if utils_dir not in sys.path:
|
||||
sys.path.append(utils_dir)
|
||||
|
||||
from retry_helper import with_graceful_retry, SEARCH_API_RETRY_CONFIG
|
||||
|
||||
|
||||
class ForumHost:
|
||||
"""
|
||||
论坛主持人类
|
||||
使用硅基流动的Qwen3-235B模型作为智能主持人
|
||||
"""
|
||||
|
||||
def __init__(self, api_key: str = None):
|
||||
"""
|
||||
初始化论坛主持人
|
||||
|
||||
Args:
|
||||
api_key: 硅基流动API密钥,如果不提供则从配置文件读取
|
||||
"""
|
||||
self.api_key = api_key or GUIJI_QWEN3_API_KEY
|
||||
self.base_url = "https://api.siliconflow.cn/v1/chat/completions"
|
||||
self.model = "Qwen/Qwen3-235B-A22B-Instruct-2507" # 使用更大的模型
|
||||
|
||||
if not self.api_key:
|
||||
raise ValueError("未找到硅基流动API密钥,请在config.py中设置GUIJI_QWEN3_API_KEY")
|
||||
|
||||
# 记录历史发言,避免重复
|
||||
self.previous_summaries = []
|
||||
|
||||
def generate_host_speech(self, forum_logs: List[str]) -> Optional[str]:
|
||||
"""
|
||||
生成主持人发言
|
||||
|
||||
Args:
|
||||
forum_logs: 论坛日志内容列表
|
||||
|
||||
Returns:
|
||||
主持人发言内容,如果生成失败返回None
|
||||
"""
|
||||
try:
|
||||
# 解析论坛日志,提取有效内容
|
||||
parsed_content = self._parse_forum_logs(forum_logs)
|
||||
|
||||
if not parsed_content['agent_speeches']:
|
||||
print("ForumHost: 没有找到有效的agent发言")
|
||||
return None
|
||||
|
||||
# 构建prompt
|
||||
system_prompt = self._build_system_prompt()
|
||||
user_prompt = self._build_user_prompt(parsed_content)
|
||||
|
||||
# 调用API生成发言
|
||||
response = self._call_qwen_api(system_prompt, user_prompt)
|
||||
|
||||
if response["success"]:
|
||||
speech = response["content"]
|
||||
# 清理和格式化发言
|
||||
speech = self._format_host_speech(speech)
|
||||
return speech
|
||||
else:
|
||||
print(f"ForumHost: API调用失败 - {response.get('error', '未知错误')}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumHost: 生成发言时出错 - {str(e)}")
|
||||
return None
|
||||
|
||||
def _parse_forum_logs(self, forum_logs: List[str]) -> Dict[str, Any]:
|
||||
"""
|
||||
解析论坛日志,提取结构化信息
|
||||
|
||||
Returns:
|
||||
包含agent发言、时间线等信息的字典
|
||||
"""
|
||||
parsed = {
|
||||
'agent_speeches': [],
|
||||
'timeline': [],
|
||||
'key_topics': set(),
|
||||
'session_start': None,
|
||||
'session_end': None
|
||||
}
|
||||
|
||||
for line in forum_logs:
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
# 解析时间戳和发言者
|
||||
match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[(\w+)\]\s*(.+)', line)
|
||||
if match:
|
||||
timestamp, speaker, content = match.groups()
|
||||
|
||||
# 记录会话开始
|
||||
if 'ForumEgine 监控开始' in content:
|
||||
parsed['session_start'] = timestamp
|
||||
continue
|
||||
|
||||
# 记录会话结束
|
||||
if 'ForumEgine 论坛结束' in content:
|
||||
parsed['session_end'] = timestamp
|
||||
continue
|
||||
|
||||
# 跳过系统消息和HOST自己的发言
|
||||
if speaker in ['SYSTEM', 'HOST']:
|
||||
continue
|
||||
|
||||
# 记录agent发言
|
||||
if speaker in ['INSIGHT', 'MEDIA', 'QUERY']:
|
||||
# 处理转义的换行符
|
||||
content = content.replace('\\n', '\n')
|
||||
|
||||
parsed['agent_speeches'].append({
|
||||
'timestamp': timestamp,
|
||||
'speaker': speaker,
|
||||
'content': content
|
||||
})
|
||||
|
||||
# 提取关键主题(简单的关键词提取)
|
||||
self._extract_key_topics(content, parsed['key_topics'])
|
||||
|
||||
# 提取时间线信息
|
||||
self._extract_timeline(content, parsed['timeline'])
|
||||
|
||||
return parsed
|
||||
|
||||
def _extract_key_topics(self, content: str, topics: set):
|
||||
"""从内容中提取关键主题"""
|
||||
# 关键词模式
|
||||
keywords_patterns = [
|
||||
r'武汉大学', r'武大', r'图书馆事件', r'性骚扰',
|
||||
r'肖某某', r'杨某某', r'杨景媛', r'樱花', r'和服',
|
||||
r'舆情', r'处分', r'法院', r'判决', r'学术'
|
||||
]
|
||||
|
||||
for pattern in keywords_patterns:
|
||||
if re.search(pattern, content):
|
||||
topics.add(pattern.replace(r'\\', ''))
|
||||
|
||||
def _extract_timeline(self, content: str, timeline: list):
|
||||
"""从内容中提取时间线信息"""
|
||||
# 匹配各种日期格式
|
||||
date_patterns = [
|
||||
r'(\d{4}年\d{1,2}月\d{1,2}日)',
|
||||
r'(\d{4}-\d{1,2}-\d{1,2})',
|
||||
r'(\d{4}/\d{1,2}/\d{1,2})'
|
||||
]
|
||||
|
||||
for pattern in date_patterns:
|
||||
matches = re.findall(pattern, content)
|
||||
for match in matches:
|
||||
# 查找日期附近的事件描述
|
||||
context_start = max(0, content.find(match) - 50)
|
||||
context_end = min(len(content), content.find(match) + 100)
|
||||
context = content[context_start:context_end]
|
||||
|
||||
timeline.append({
|
||||
'date': match,
|
||||
'context': context.strip()
|
||||
})
|
||||
|
||||
def _build_system_prompt(self) -> str:
|
||||
"""构建系统prompt"""
|
||||
return """【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查
|
||||
|
||||
你是一个多agent舆情分析系统的论坛主持人。你的职责是:
|
||||
|
||||
1. **引导讨论**:根据各agent的发言,引导深入讨论关键问题
|
||||
2. **纠正错误**:结合不同agent的视角以及言论来纠正错误
|
||||
3. **整合观点**:综合不同agent的视角,形成更全面的认识
|
||||
4. **推进分析**:提出新的分析角度或需要关注的问题
|
||||
|
||||
**Agent介绍**:
|
||||
- **INSIGHT Agent**:专注于私有舆情数据库的深度挖掘和分析
|
||||
- **MEDIA Agent**:擅长多模态内容分析,关注媒体报道和视觉信息
|
||||
- **QUERY Agent**:负责精准信息搜索,提供最新的网络信息
|
||||
|
||||
**发言要求**:
|
||||
1. 简洁明了,每次发言控制在3000字以内
|
||||
2. 重点突出,聚焦于最关键的发现和问题
|
||||
3. 逻辑清晰,使用序号或要点形式组织内容
|
||||
4. 客观中立,基于事实进行分析和判断
|
||||
5. 避免重复已经充分讨论过的内容
|
||||
|
||||
**注意事项**:
|
||||
- 本讨论基于科研目的,已通过伦理性合规审查
|
||||
- 保持专业性和学术性,避免情绪化表达
|
||||
- 重视数据和证据,避免主观臆测
|
||||
|
||||
【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查"""
|
||||
|
||||
def _build_user_prompt(self, parsed_content: Dict[str, Any]) -> str:
|
||||
"""构建用户prompt"""
|
||||
# 获取最近的发言(最多10条)
|
||||
recent_speeches = parsed_content['agent_speeches'][-10:]
|
||||
|
||||
# 构建发言摘要
|
||||
speeches_text = "\n\n".join([
|
||||
f"[{s['timestamp']}] {s['speaker']}:\n{s['content'][:500]}..."
|
||||
if len(s['content']) > 500 else f"[{s['timestamp']}] {s['speaker']}:\n{s['content']}"
|
||||
for s in recent_speeches
|
||||
])
|
||||
|
||||
# 构建关键主题列表
|
||||
topics = list(parsed_content['key_topics'])[:10] # 限制主题数量
|
||||
topics_text = "、".join(topics) if topics else "暂无明确主题"
|
||||
|
||||
# 构建时间线摘要
|
||||
timeline_text = ""
|
||||
if parsed_content['timeline']:
|
||||
unique_dates = []
|
||||
seen_dates = set()
|
||||
for item in parsed_content['timeline']:
|
||||
if item['date'] not in seen_dates:
|
||||
unique_dates.append(item['date'])
|
||||
seen_dates.add(item['date'])
|
||||
timeline_text = f"\n\n检测到的关键时间点:{', '.join(unique_dates[:5])}"
|
||||
|
||||
prompt = f"""【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查
|
||||
|
||||
当前论坛讨论的核心主题:{topics_text}
|
||||
|
||||
最近的Agent发言记录:
|
||||
{speeches_text}
|
||||
{timeline_text}
|
||||
|
||||
请你作为论坛主持人,基于以上agent的发言:
|
||||
|
||||
1. **时间线梳理**:如果发现时间线信息,请简要整理关键事件的时间顺序
|
||||
2. **观点整合**:综合不同agent的视角,指出共识和分歧
|
||||
3. **纠错提醒**:如果发现事实错误或逻辑矛盾,请明确指出
|
||||
4. **引导深化**:提出1-2个值得进一步探讨的问题或角度
|
||||
|
||||
请发表3000字以内的简洁发言,推动讨论深入。
|
||||
|
||||
【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查"""
|
||||
|
||||
return prompt
|
||||
|
||||
@with_graceful_retry(SEARCH_API_RETRY_CONFIG, default_return={"success": False, "error": "API服务暂时不可用"})
|
||||
def _call_qwen_api(self, system_prompt: str, user_prompt: str) -> Dict[str, Any]:
|
||||
"""调用Qwen API"""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
data = {
|
||||
"model": self.model,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"max_tokens": 1000,
|
||||
"temperature": 0.7,
|
||||
"top_p": 0.9
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
self.base_url,
|
||||
headers=headers,
|
||||
json=data,
|
||||
timeout=60 # 大模型需要更长的超时时间
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = response.json()
|
||||
|
||||
if "choices" in result and len(result["choices"]) > 0:
|
||||
content = result["choices"][0]["message"]["content"]
|
||||
return {"success": True, "content": content}
|
||||
else:
|
||||
return {"success": False, "error": "API返回格式异常"}
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
return {"success": False, "error": "API请求超时"}
|
||||
except requests.exceptions.RequestException as e:
|
||||
return {"success": False, "error": f"网络请求错误: {str(e)}"}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": f"API调用异常: {str(e)}"}
|
||||
|
||||
def _format_host_speech(self, speech: str) -> str:
|
||||
"""格式化主持人发言"""
|
||||
# 移除多余的空行
|
||||
speech = re.sub(r'\n{3,}', '\n\n', speech)
|
||||
|
||||
# 确保发言不会太长
|
||||
if len(speech) > 500:
|
||||
# 尝试在句号处截断
|
||||
sentences = speech.split('。')
|
||||
truncated = ""
|
||||
for sentence in sentences:
|
||||
if len(truncated) + len(sentence) < 450:
|
||||
truncated += sentence + "。"
|
||||
else:
|
||||
break
|
||||
speech = truncated.rstrip("。") + "。"
|
||||
|
||||
# 移除可能的引号
|
||||
speech = speech.strip('"\'""''')
|
||||
|
||||
return speech.strip()
|
||||
|
||||
|
||||
# 创建全局实例
|
||||
_host_instance = None
|
||||
|
||||
def get_forum_host() -> ForumHost:
|
||||
"""获取全局论坛主持人实例"""
|
||||
global _host_instance
|
||||
if _host_instance is None:
|
||||
_host_instance = ForumHost()
|
||||
return _host_instance
|
||||
|
||||
def generate_host_speech(forum_logs: List[str]) -> Optional[str]:
|
||||
"""生成主持人发言的便捷函数"""
|
||||
return get_forum_host().generate_host_speech(forum_logs)
|
||||
@@ -0,0 +1,721 @@
|
||||
"""
|
||||
日志监控器 - 实时监控三个log文件中的SummaryNode输出
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import re
|
||||
import json
|
||||
from typing import Dict, Optional, List
|
||||
from threading import Lock
|
||||
|
||||
# 导入论坛主持人模块
|
||||
try:
|
||||
from .llm_host import generate_host_speech
|
||||
HOST_AVAILABLE = True
|
||||
except ImportError:
|
||||
print("ForumEgine: 论坛主持人模块未找到,将以纯监控模式运行")
|
||||
HOST_AVAILABLE = False
|
||||
|
||||
class LogMonitor:
|
||||
"""基于文件变化的智能日志监控器"""
|
||||
|
||||
def __init__(self, log_dir: str = "logs"):
|
||||
"""初始化日志监控器"""
|
||||
self.log_dir = Path(log_dir)
|
||||
self.forum_log_file = self.log_dir / "forum.log"
|
||||
|
||||
# 要监控的日志文件
|
||||
self.monitored_logs = {
|
||||
'insight': self.log_dir / 'insight.log',
|
||||
'media': self.log_dir / 'media.log',
|
||||
'query': self.log_dir / 'query.log'
|
||||
}
|
||||
|
||||
# 监控状态
|
||||
self.is_monitoring = False
|
||||
self.monitor_thread = None
|
||||
self.file_positions = {} # 记录每个文件的读取位置
|
||||
self.file_line_counts = {} # 记录每个文件的行数
|
||||
self.is_searching = False # 是否正在搜索
|
||||
self.search_inactive_count = 0 # 搜索非活跃计数器
|
||||
self.write_lock = Lock() # 写入锁,防止并发写入冲突
|
||||
|
||||
# 主持人相关状态
|
||||
self.agent_speech_count = 0 # agent发言计数器
|
||||
self.host_speech_threshold = 5 # 每5条agent发言触发一次主持人发言
|
||||
self.last_host_speech_time = None # 上次主持人发言时间
|
||||
self.min_host_interval = 30 # 主持人发言最小间隔(秒)
|
||||
|
||||
# 目标节点名称 - 直接匹配字符串
|
||||
self.target_nodes = [
|
||||
'FirstSummaryNode',
|
||||
'ReflectionSummaryNode'
|
||||
]
|
||||
|
||||
# 多行内容捕获状态
|
||||
self.capturing_json = {} # 每个app的JSON捕获状态
|
||||
self.json_buffer = {} # 每个app的JSON缓冲区
|
||||
self.json_start_line = {} # 每个app的JSON开始行
|
||||
|
||||
# 确保logs目录存在
|
||||
self.log_dir.mkdir(exist_ok=True)
|
||||
|
||||
def clear_forum_log(self):
|
||||
"""清空forum.log文件"""
|
||||
try:
|
||||
if self.forum_log_file.exists():
|
||||
self.forum_log_file.unlink()
|
||||
|
||||
# 创建新的forum.log文件并写入开始标记
|
||||
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
# 使用write_to_forum_log函数来写入开始标记,确保格式一致
|
||||
with open(self.forum_log_file, 'w', encoding='utf-8') as f:
|
||||
pass # 先创建空文件
|
||||
self.write_to_forum_log(f"=== ForumEgine 监控开始 - {start_time} ===", "SYSTEM")
|
||||
|
||||
print(f"ForumEgine: forum.log 已清空并初始化")
|
||||
|
||||
# 重置JSON捕获状态
|
||||
self.capturing_json = {}
|
||||
self.json_buffer = {}
|
||||
self.json_start_line = {}
|
||||
|
||||
# 重置主持人相关状态
|
||||
self.agent_speech_count = 0
|
||||
self.last_host_speech_time = None
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 清空forum.log失败: {e}")
|
||||
|
||||
def write_to_forum_log(self, content: str, source: str = None):
|
||||
"""写入内容到forum.log(线程安全)"""
|
||||
try:
|
||||
with self.write_lock: # 使用锁确保线程安全
|
||||
with open(self.forum_log_file, 'a', encoding='utf-8') as f:
|
||||
timestamp = datetime.now().strftime('%H:%M:%S')
|
||||
# 将内容中的实际换行符转换为\n字符串,确保整个记录在一行
|
||||
content_one_line = content.replace('\n', '\\n').replace('\r', '\\r')
|
||||
# 如果提供了来源标签,则在时间戳后添加
|
||||
if source:
|
||||
f.write(f"[{timestamp}] [{source}] {content_one_line}\n")
|
||||
else:
|
||||
f.write(f"[{timestamp}] {content_one_line}\n")
|
||||
f.flush()
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 写入forum.log失败: {e}")
|
||||
|
||||
def is_target_log_line(self, line: str) -> bool:
|
||||
"""检查是否是目标日志行(SummaryNode)"""
|
||||
# 简单字符串包含检查,更可靠
|
||||
for node_name in self.target_nodes:
|
||||
if node_name in line:
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_valuable_content(self, line: str) -> bool:
|
||||
"""判断是否是有价值的内容(排除短小的提示信息和错误信息)"""
|
||||
# 如果包含"清理后的输出",则认为是有价值的
|
||||
if "清理后的输出" in line:
|
||||
return True
|
||||
|
||||
# 排除常见的短小提示信息和错误信息
|
||||
exclude_patterns = [
|
||||
"JSON解析失败",
|
||||
"JSON修复失败",
|
||||
"直接使用清理后的文本",
|
||||
"JSON解析成功",
|
||||
"成功生成",
|
||||
"已更新段落",
|
||||
"正在生成",
|
||||
"开始处理",
|
||||
"处理完成"
|
||||
]
|
||||
|
||||
for pattern in exclude_patterns:
|
||||
if pattern in line:
|
||||
return False
|
||||
|
||||
# 如果行长度过短,也认为不是有价值的内容
|
||||
clean_line = re.sub(r'\[\d{2}:\d{2}:\d{2}\]', '', line).strip()
|
||||
if len(clean_line) < 30: # 阈值可以调整
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def is_json_start_line(self, line: str) -> bool:
|
||||
"""判断是否是JSON开始行"""
|
||||
return "清理后的输出: {" in line
|
||||
|
||||
def is_json_end_line(self, line: str) -> bool:
|
||||
"""判断是否是JSON结束行"""
|
||||
stripped = line.strip()
|
||||
return stripped == "}" or (stripped.startswith("[") and stripped.endswith("] }"))
|
||||
|
||||
def extract_json_content(self, json_lines: List[str]) -> Optional[str]:
|
||||
"""从多行中提取并解析JSON内容"""
|
||||
try:
|
||||
# 找到JSON开始的位置
|
||||
json_start_idx = -1
|
||||
for i, line in enumerate(json_lines):
|
||||
if "清理后的输出: {" in line:
|
||||
json_start_idx = i
|
||||
break
|
||||
|
||||
if json_start_idx == -1:
|
||||
return None
|
||||
|
||||
# 提取JSON部分
|
||||
first_line = json_lines[json_start_idx]
|
||||
json_start_pos = first_line.find("清理后的输出: {")
|
||||
if json_start_pos == -1:
|
||||
return None
|
||||
|
||||
json_part = first_line[json_start_pos + len("清理后的输出: "):]
|
||||
|
||||
# 如果第一行就包含完整JSON,直接处理
|
||||
if json_part.strip().endswith("}") and json_part.count("{") == json_part.count("}"):
|
||||
try:
|
||||
json_obj = json.loads(json_part.strip())
|
||||
return self.format_json_content(json_obj)
|
||||
except json.JSONDecodeError:
|
||||
# 单行JSON解析失败,尝试修复
|
||||
fixed_json = self.fix_json_string(json_part.strip())
|
||||
if fixed_json:
|
||||
try:
|
||||
json_obj = json.loads(fixed_json)
|
||||
return self.format_json_content(json_obj)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return None
|
||||
|
||||
# 处理多行JSON
|
||||
json_text = json_part
|
||||
for line in json_lines[json_start_idx + 1:]:
|
||||
# 移除时间戳
|
||||
clean_line = re.sub(r'^\[\d{2}:\d{2}:\d{2}\]\s*', '', line)
|
||||
json_text += clean_line
|
||||
|
||||
# 尝试解析JSON
|
||||
try:
|
||||
json_obj = json.loads(json_text.strip())
|
||||
return self.format_json_content(json_obj)
|
||||
except json.JSONDecodeError:
|
||||
# 多行JSON解析失败,尝试修复
|
||||
fixed_json = self.fix_json_string(json_text.strip())
|
||||
if fixed_json:
|
||||
try:
|
||||
json_obj = json.loads(fixed_json)
|
||||
return self.format_json_content(json_obj)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
# 其他异常也不打印错误信息,直接返回None
|
||||
return None
|
||||
|
||||
def format_json_content(self, json_obj: dict) -> str:
|
||||
"""格式化JSON内容为可读形式"""
|
||||
try:
|
||||
# 提取主要内容,优先选择反思总结,其次是首次总结
|
||||
content = None
|
||||
|
||||
if "updated_paragraph_latest_state" in json_obj:
|
||||
content = json_obj["updated_paragraph_latest_state"]
|
||||
elif "paragraph_latest_state" in json_obj:
|
||||
content = json_obj["paragraph_latest_state"]
|
||||
|
||||
# 如果找到了内容,直接返回(保持换行符为\n)
|
||||
if content:
|
||||
return content
|
||||
|
||||
# 如果没有找到预期的字段,返回整个JSON的字符串表示
|
||||
return f"清理后的输出: {json.dumps(json_obj, ensure_ascii=False, indent=2)}"
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 格式化JSON时出错: {e}")
|
||||
return f"清理后的输出: {json.dumps(json_obj, ensure_ascii=False, indent=2)}"
|
||||
|
||||
def extract_node_content(self, line: str) -> Optional[str]:
|
||||
"""提取节点内容,去除时间戳、节点名称等前缀"""
|
||||
# 移除时间戳部分
|
||||
# 格式: [HH:MM:SS] [NodeName] message
|
||||
match = re.search(r'\[\d{2}:\d{2}:\d{2}\]\s*(.+)', line)
|
||||
if match:
|
||||
content = match.group(1).strip()
|
||||
|
||||
# 移除所有的方括号标签(包括节点名称和应用名称)
|
||||
content = re.sub(r'^\[.*?\]\s*', '', content)
|
||||
|
||||
# 继续移除可能的多个连续标签
|
||||
while re.match(r'^\[.*?\]\s*', content):
|
||||
content = re.sub(r'^\[.*?\]\s*', '', content)
|
||||
|
||||
# 移除常见前缀(如"首次总结: "、"反思总结: "等)
|
||||
prefixes_to_remove = [
|
||||
"首次总结: ",
|
||||
"反思总结: ",
|
||||
"清理后的输出: "
|
||||
]
|
||||
|
||||
for prefix in prefixes_to_remove:
|
||||
if content.startswith(prefix):
|
||||
content = content[len(prefix):]
|
||||
break
|
||||
|
||||
# 移除可能存在的应用名标签(不在方括号内的)
|
||||
app_names = ['INSIGHT', 'MEDIA', 'QUERY']
|
||||
for app_name in app_names:
|
||||
# 移除单独的APP_NAME(在行首)
|
||||
content = re.sub(rf'^{app_name}\s+', '', content, flags=re.IGNORECASE)
|
||||
|
||||
# 清理多余的空格
|
||||
content = re.sub(r'\s+', ' ', content)
|
||||
|
||||
return content.strip()
|
||||
return line.strip()
|
||||
|
||||
def get_file_size(self, file_path: Path) -> int:
|
||||
"""获取文件大小"""
|
||||
try:
|
||||
return file_path.stat().st_size if file_path.exists() else 0
|
||||
except:
|
||||
return 0
|
||||
|
||||
def get_file_line_count(self, file_path: Path) -> int:
|
||||
"""获取文件行数"""
|
||||
try:
|
||||
if not file_path.exists():
|
||||
return 0
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
return sum(1 for _ in f)
|
||||
except:
|
||||
return 0
|
||||
|
||||
def read_new_lines(self, file_path: Path, app_name: str) -> List[str]:
|
||||
"""读取文件中的新行"""
|
||||
new_lines = []
|
||||
|
||||
try:
|
||||
if not file_path.exists():
|
||||
return new_lines
|
||||
|
||||
current_size = self.get_file_size(file_path)
|
||||
last_position = self.file_positions.get(app_name, 0)
|
||||
|
||||
# 如果文件变小了,说明被清空了,重新从头开始
|
||||
if current_size < last_position:
|
||||
last_position = 0
|
||||
# 重置JSON捕获状态
|
||||
self.capturing_json[app_name] = False
|
||||
self.json_buffer[app_name] = []
|
||||
|
||||
if current_size > last_position:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
f.seek(last_position)
|
||||
new_content = f.read()
|
||||
new_lines = new_content.split('\n')
|
||||
|
||||
# 更新位置
|
||||
self.file_positions[app_name] = f.tell()
|
||||
|
||||
# 过滤空行
|
||||
new_lines = [line.strip() for line in new_lines if line.strip()]
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 读取{app_name}日志失败: {e}")
|
||||
|
||||
return new_lines
|
||||
|
||||
def process_lines_for_json(self, lines: List[str], app_name: str) -> List[str]:
|
||||
"""处理行以捕获多行JSON内容"""
|
||||
captured_contents = []
|
||||
|
||||
# 初始化状态
|
||||
if app_name not in self.capturing_json:
|
||||
self.capturing_json[app_name] = False
|
||||
self.json_buffer[app_name] = []
|
||||
|
||||
for line in lines:
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
# 检查是否是目标节点行
|
||||
if self.is_target_log_line(line):
|
||||
if self.is_json_start_line(line):
|
||||
# 开始捕获JSON
|
||||
self.capturing_json[app_name] = True
|
||||
self.json_buffer[app_name] = [line]
|
||||
self.json_start_line[app_name] = line
|
||||
|
||||
# 检查是否是单行JSON
|
||||
if line.strip().endswith("}"):
|
||||
# 单行JSON,立即处理
|
||||
content = self.extract_json_content([line])
|
||||
if content: # 只有成功解析的内容才会被记录
|
||||
# 去除重复的标签和格式化
|
||||
clean_content = self._clean_content_tags(content, app_name)
|
||||
captured_contents.append(f"{clean_content}")
|
||||
self.capturing_json[app_name] = False
|
||||
self.json_buffer[app_name] = []
|
||||
|
||||
elif self.is_valuable_content(line):
|
||||
# 其他有价值的SummaryNode内容
|
||||
clean_content = self._clean_content_tags(self.extract_node_content(line), app_name)
|
||||
captured_contents.append(f"{clean_content}")
|
||||
|
||||
elif self.capturing_json[app_name]:
|
||||
# 正在捕获JSON的后续行
|
||||
self.json_buffer[app_name].append(line)
|
||||
|
||||
# 检查是否是JSON结束
|
||||
if self.is_json_end_line(line):
|
||||
# JSON结束,处理完整的JSON
|
||||
content = self.extract_json_content(self.json_buffer[app_name])
|
||||
if content: # 只有成功解析的内容才会被记录
|
||||
# 去除重复的标签和格式化
|
||||
clean_content = self._clean_content_tags(content, app_name)
|
||||
captured_contents.append(f"{clean_content}")
|
||||
|
||||
# 重置状态
|
||||
self.capturing_json[app_name] = False
|
||||
self.json_buffer[app_name] = []
|
||||
|
||||
return captured_contents
|
||||
|
||||
def _trigger_host_speech(self):
|
||||
"""触发主持人发言"""
|
||||
if not HOST_AVAILABLE:
|
||||
return
|
||||
|
||||
try:
|
||||
# 检查时间间隔
|
||||
current_time = time.time()
|
||||
if self.last_host_speech_time:
|
||||
if current_time - self.last_host_speech_time < self.min_host_interval:
|
||||
return # 间隔太短,跳过
|
||||
|
||||
# 获取当前forum.log的内容
|
||||
forum_logs = self.get_forum_log_content()
|
||||
if not forum_logs:
|
||||
return
|
||||
|
||||
print("ForumEgine: 正在生成主持人发言...")
|
||||
|
||||
# 调用主持人生成发言
|
||||
host_speech = generate_host_speech(forum_logs)
|
||||
|
||||
if host_speech:
|
||||
# 写入主持人发言到forum.log
|
||||
self.write_to_forum_log(host_speech, "HOST")
|
||||
self.last_host_speech_time = current_time
|
||||
print(f"ForumEgine: 主持人发言已记录")
|
||||
|
||||
# 重置计数器
|
||||
self.agent_speech_count = 0
|
||||
else:
|
||||
print("ForumEgine: 主持人发言生成失败")
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 触发主持人发言时出错: {e}")
|
||||
|
||||
def _clean_content_tags(self, content: str, app_name: str) -> str:
|
||||
"""清理内容中的重复标签和多余前缀"""
|
||||
if not content:
|
||||
return content
|
||||
|
||||
# 先去除所有可能的标签格式(包括 [INSIGHT]、[MEDIA]、[QUERY] 等)
|
||||
# 使用更强力的清理方式
|
||||
all_app_names = ['INSIGHT', 'MEDIA', 'QUERY']
|
||||
|
||||
for name in all_app_names:
|
||||
# 去除 [APP_NAME] 格式(大小写不敏感)
|
||||
content = re.sub(rf'\[{name}\]\s*', '', content, flags=re.IGNORECASE)
|
||||
# 去除单独的 APP_NAME 格式
|
||||
content = re.sub(rf'^{name}\s+', '', content, flags=re.IGNORECASE)
|
||||
|
||||
# 去除任何其他的方括号标签
|
||||
content = re.sub(r'^\[.*?\]\s*', '', content)
|
||||
|
||||
# 去除可能的重复空格
|
||||
content = re.sub(r'\s+', ' ', content)
|
||||
|
||||
return content.strip()
|
||||
|
||||
def monitor_logs(self):
|
||||
"""智能监控日志文件"""
|
||||
print("ForumEgine: 论坛创建中...")
|
||||
|
||||
# 初始化文件行数和位置 - 记录当前状态作为基线
|
||||
for app_name, log_file in self.monitored_logs.items():
|
||||
self.file_line_counts[app_name] = self.get_file_line_count(log_file)
|
||||
self.file_positions[app_name] = self.get_file_size(log_file)
|
||||
self.capturing_json[app_name] = False
|
||||
self.json_buffer[app_name] = []
|
||||
# print(f"ForumEgine: {app_name} 基线行数: {self.file_line_counts[app_name]}")
|
||||
|
||||
while self.is_monitoring:
|
||||
try:
|
||||
# 同时检测三个log文件的变化
|
||||
any_growth = False
|
||||
any_shrink = False
|
||||
captured_any = False
|
||||
|
||||
# 为每个log文件独立处理
|
||||
for app_name, log_file in self.monitored_logs.items():
|
||||
current_lines = self.get_file_line_count(log_file)
|
||||
previous_lines = self.file_line_counts.get(app_name, 0)
|
||||
|
||||
if current_lines > previous_lines:
|
||||
any_growth = True
|
||||
# 立即读取新增内容
|
||||
new_lines = self.read_new_lines(log_file, app_name)
|
||||
|
||||
# 先检查是否需要触发搜索(只触发一次)
|
||||
if not self.is_searching:
|
||||
for line in new_lines:
|
||||
if line.strip() and 'FirstSummaryNode' in line:
|
||||
print(f"ForumEgine: 在{app_name}中检测到第一次论坛发表内容")
|
||||
self.is_searching = True
|
||||
self.search_inactive_count = 0
|
||||
# 清空forum.log开始新会话
|
||||
self.clear_forum_log()
|
||||
break # 找到一个就够了,跳出循环
|
||||
|
||||
# 处理所有新增内容(如果正在搜索状态)
|
||||
if self.is_searching:
|
||||
# 使用新的处理逻辑
|
||||
captured_contents = self.process_lines_for_json(new_lines, app_name)
|
||||
|
||||
for content in captured_contents:
|
||||
# 将app_name转换为大写作为标签(如 insight -> INSIGHT)
|
||||
source_tag = app_name.upper()
|
||||
self.write_to_forum_log(content, source_tag)
|
||||
# print(f"ForumEgine: 捕获 - {content}")
|
||||
captured_any = True
|
||||
|
||||
# 增加agent发言计数
|
||||
self.agent_speech_count += 1
|
||||
|
||||
# 检查是否需要触发主持人发言
|
||||
if self.agent_speech_count >= self.host_speech_threshold:
|
||||
# 在单独的线程中触发主持人发言,避免阻塞监控
|
||||
host_thread = threading.Thread(
|
||||
target=self._trigger_host_speech,
|
||||
daemon=True
|
||||
)
|
||||
host_thread.start()
|
||||
|
||||
elif current_lines < previous_lines:
|
||||
any_shrink = True
|
||||
# print(f"ForumEgine: 检测到 {app_name} 日志缩短,将重置基线")
|
||||
# 重置文件位置到新的文件末尾
|
||||
self.file_positions[app_name] = self.get_file_size(log_file)
|
||||
# 重置JSON捕获状态
|
||||
self.capturing_json[app_name] = False
|
||||
self.json_buffer[app_name] = []
|
||||
|
||||
# 更新行数记录
|
||||
self.file_line_counts[app_name] = current_lines
|
||||
|
||||
# 检查是否应该结束当前搜索会话
|
||||
if self.is_searching:
|
||||
if any_shrink:
|
||||
# log变短,结束当前搜索会话,重置为等待状态
|
||||
# print("ForumEgine: 日志缩短,结束当前搜索会话,回到等待状态")
|
||||
self.is_searching = False
|
||||
self.search_inactive_count = 0
|
||||
# 重置主持人相关状态
|
||||
self.agent_speech_count = 0
|
||||
self.last_host_speech_time = None
|
||||
# 写入结束标记
|
||||
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.write_to_forum_log(f"=== ForumEgine 论坛结束 - {end_time} ===", "SYSTEM")
|
||||
# print("ForumEgine: 已重置基线,等待下次FirstSummaryNode触发")
|
||||
elif not any_growth and not captured_any:
|
||||
# 没有增长也没有捕获内容,增加非活跃计数
|
||||
self.search_inactive_count += 1
|
||||
if self.search_inactive_count >= 900: # 15分钟无活动才结束
|
||||
print("ForumEgine: 长时间无活动,结束论坛")
|
||||
self.is_searching = False
|
||||
self.search_inactive_count = 0
|
||||
# 重置主持人相关状态
|
||||
self.agent_speech_count = 0
|
||||
self.last_host_speech_time = None
|
||||
# 写入结束标记
|
||||
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.write_to_forum_log(f"=== ForumEgine 论坛结束 - {end_time} ===", "SYSTEM")
|
||||
else:
|
||||
self.search_inactive_count = 0 # 重置计数器
|
||||
|
||||
# 短暂休眠
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 论坛记录中出错: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
time.sleep(2)
|
||||
|
||||
print("ForumEgine: 停止论坛日志文件")
|
||||
|
||||
def start_monitoring(self):
|
||||
"""开始智能监控"""
|
||||
if self.is_monitoring:
|
||||
print("ForumEgine: 论坛已经在运行中")
|
||||
return False
|
||||
|
||||
try:
|
||||
# 启动监控
|
||||
self.is_monitoring = True
|
||||
self.monitor_thread = threading.Thread(target=self.monitor_logs, daemon=True)
|
||||
self.monitor_thread.start()
|
||||
|
||||
print("ForumEgine: 论坛已启动")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 启动论坛失败: {e}")
|
||||
self.is_monitoring = False
|
||||
return False
|
||||
|
||||
def stop_monitoring(self):
|
||||
"""停止监控"""
|
||||
if not self.is_monitoring:
|
||||
print("ForumEgine: 论坛未运行")
|
||||
return
|
||||
|
||||
try:
|
||||
self.is_monitoring = False
|
||||
|
||||
if self.monitor_thread and self.monitor_thread.is_alive():
|
||||
self.monitor_thread.join(timeout=2)
|
||||
|
||||
# 写入结束标记
|
||||
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.write_to_forum_log(f"=== ForumEgine 论坛结束 - {end_time} ===", "SYSTEM")
|
||||
|
||||
print("ForumEgine: 论坛已停止")
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 停止论坛失败: {e}")
|
||||
|
||||
def get_forum_log_content(self) -> List[str]:
|
||||
"""获取forum.log的内容"""
|
||||
try:
|
||||
if not self.forum_log_file.exists():
|
||||
return []
|
||||
|
||||
with open(self.forum_log_file, 'r', encoding='utf-8') as f:
|
||||
return [line.rstrip('\n\r') for line in f.readlines()]
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEgine: 读取forum.log失败: {e}")
|
||||
return []
|
||||
|
||||
def fix_json_string(self, json_text: str) -> str:
|
||||
"""修复JSON字符串中的常见问题,特别是未转义的双引号"""
|
||||
try:
|
||||
# 尝试直接解析,如果成功则返回原文本
|
||||
json.loads(json_text)
|
||||
return json_text
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# 修复未转义的双引号问题
|
||||
# 这是一个更智能的修复方法,专门处理字符串值中的双引号
|
||||
|
||||
try:
|
||||
# 使用状态机方法修复JSON
|
||||
# 遍历字符,跟踪是否在字符串值内部
|
||||
|
||||
fixed_text = ""
|
||||
i = 0
|
||||
in_string = False
|
||||
escape_next = False
|
||||
|
||||
while i < len(json_text):
|
||||
char = json_text[i]
|
||||
|
||||
if escape_next:
|
||||
# 处理转义字符
|
||||
fixed_text += char
|
||||
escape_next = False
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if char == '\\':
|
||||
# 转义字符
|
||||
fixed_text += char
|
||||
escape_next = True
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if char == '"' and not escape_next:
|
||||
# 遇到双引号
|
||||
if in_string:
|
||||
# 在字符串内部,检查下一个字符
|
||||
# 如果下一个字符是冒号或者逗号或者大括号,说明这是字符串结束
|
||||
next_char_pos = i + 1
|
||||
while next_char_pos < len(json_text) and json_text[next_char_pos].isspace():
|
||||
next_char_pos += 1
|
||||
|
||||
if next_char_pos < len(json_text):
|
||||
next_char = json_text[next_char_pos]
|
||||
if next_char in [':', ',', '}']:
|
||||
# 这是字符串结束,退出字符串状态
|
||||
in_string = False
|
||||
fixed_text += char
|
||||
else:
|
||||
# 这是字符串内部的引号,需要转义
|
||||
fixed_text += '\\"'
|
||||
else:
|
||||
# 文件结束,退出字符串状态
|
||||
in_string = False
|
||||
fixed_text += char
|
||||
else:
|
||||
# 字符串开始
|
||||
in_string = True
|
||||
fixed_text += char
|
||||
else:
|
||||
# 其他字符
|
||||
fixed_text += char
|
||||
|
||||
i += 1
|
||||
|
||||
# 尝试解析修复后的JSON
|
||||
try:
|
||||
json.loads(fixed_text)
|
||||
return fixed_text
|
||||
except json.JSONDecodeError:
|
||||
# 修复失败,返回None
|
||||
return None
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
# 全局监控器实例
|
||||
_monitor_instance = None
|
||||
|
||||
def get_monitor() -> LogMonitor:
|
||||
"""获取全局监控器实例"""
|
||||
global _monitor_instance
|
||||
if _monitor_instance is None:
|
||||
_monitor_instance = LogMonitor()
|
||||
return _monitor_instance
|
||||
|
||||
def start_forum_monitoring():
|
||||
"""启动ForumEgine智能监控"""
|
||||
return get_monitor().start_monitoring()
|
||||
|
||||
def stop_forum_monitoring():
|
||||
"""停止ForumEgine监控"""
|
||||
get_monitor().stop_monitoring()
|
||||
|
||||
def get_forum_log():
|
||||
"""获取forum.log内容"""
|
||||
return get_monitor().get_forum_log_content()
|
||||
Reference in New Issue
Block a user