Remove forum host asynchronous strategy and simplify the process.
This commit is contained in:
+50
-117
@@ -28,7 +28,7 @@ from retry_helper import with_graceful_retry, SEARCH_API_RETRY_CONFIG
|
||||
class ForumHost:
|
||||
"""
|
||||
论坛主持人类
|
||||
使用硅基流动的Qwen3-235B模型作为智能主持人
|
||||
使用Qwen3-235B模型作为智能主持人
|
||||
"""
|
||||
|
||||
def __init__(self, api_key: str = None):
|
||||
@@ -88,17 +88,13 @@ class ForumHost:
|
||||
|
||||
def _parse_forum_logs(self, forum_logs: List[str]) -> Dict[str, Any]:
|
||||
"""
|
||||
解析论坛日志,提取结构化信息
|
||||
解析论坛日志,提取agent发言
|
||||
|
||||
Returns:
|
||||
包含agent发言、时间线等信息的字典
|
||||
包含agent发言的字典
|
||||
"""
|
||||
parsed = {
|
||||
'agent_speeches': [],
|
||||
'timeline': [],
|
||||
'key_topics': set(),
|
||||
'session_start': None,
|
||||
'session_end': None
|
||||
'agent_speeches': []
|
||||
}
|
||||
|
||||
for line in forum_logs:
|
||||
@@ -110,16 +106,6 @@ class ForumHost:
|
||||
if match:
|
||||
timestamp, speaker, content = match.groups()
|
||||
|
||||
# 记录会话开始
|
||||
if 'ForumEngine 监控开始' in content:
|
||||
parsed['session_start'] = timestamp
|
||||
continue
|
||||
|
||||
# 记录会话结束
|
||||
if 'ForumEngine 论坛结束' in content:
|
||||
parsed['session_end'] = timestamp
|
||||
continue
|
||||
|
||||
# 跳过系统消息和HOST自己的发言
|
||||
if speaker in ['SYSTEM', 'HOST']:
|
||||
continue
|
||||
@@ -134,49 +120,9 @@ class ForumHost:
|
||||
'speaker': speaker,
|
||||
'content': content
|
||||
})
|
||||
|
||||
# 提取关键主题(简单的关键词提取)
|
||||
self._extract_key_topics(content, parsed['key_topics'])
|
||||
|
||||
# 提取时间线信息
|
||||
self._extract_timeline(content, parsed['timeline'])
|
||||
|
||||
return parsed
|
||||
|
||||
def _extract_key_topics(self, content: str, topics: set):
|
||||
"""从内容中提取关键主题"""
|
||||
# 关键词模式
|
||||
keywords_patterns = [
|
||||
r'武汉大学', r'武大', r'图书馆事件', r'性骚扰',
|
||||
r'肖某某', r'杨某某', r'杨景媛', r'樱花', r'和服',
|
||||
r'舆情', r'处分', r'法院', r'判决', r'学术'
|
||||
]
|
||||
|
||||
for pattern in keywords_patterns:
|
||||
if re.search(pattern, content):
|
||||
topics.add(pattern.replace(r'\\', ''))
|
||||
|
||||
def _extract_timeline(self, content: str, timeline: list):
|
||||
"""从内容中提取时间线信息"""
|
||||
# 匹配各种日期格式
|
||||
date_patterns = [
|
||||
r'(\d{4}年\d{1,2}月\d{1,2}日)',
|
||||
r'(\d{4}-\d{1,2}-\d{1,2})',
|
||||
r'(\d{4}/\d{1,2}/\d{1,2})'
|
||||
]
|
||||
|
||||
for pattern in date_patterns:
|
||||
matches = re.findall(pattern, content)
|
||||
for match in matches:
|
||||
# 查找日期附近的事件描述
|
||||
context_start = max(0, content.find(match) - 50)
|
||||
context_end = min(len(content), content.find(match) + 100)
|
||||
context = content[context_start:context_end]
|
||||
|
||||
timeline.append({
|
||||
'date': match,
|
||||
'context': context.strip()
|
||||
})
|
||||
|
||||
def _build_system_prompt(self) -> str:
|
||||
"""构建系统prompt"""
|
||||
@@ -184,73 +130,72 @@ class ForumHost:
|
||||
|
||||
你是一个多agent舆情分析系统的论坛主持人。你的职责是:
|
||||
|
||||
1. **引导讨论**:根据各agent的发言,引导深入讨论关键问题
|
||||
2. **纠正错误**:结合不同agent的视角以及言论来纠正错误
|
||||
3. **整合观点**:综合不同agent的视角,形成更全面的认识
|
||||
4. **推进分析**:提出新的分析角度或需要关注的问题
|
||||
1. **事件梳理**:从各agent的发言中自动识别关键事件、人物、时间节点,按时间顺序整理事件脉络
|
||||
2. **引导讨论**:根据各agent的发言,引导深入讨论关键问题,探究深层原因
|
||||
3. **纠正错误**:结合不同agent的视角以及言论,如果发现事实错误或逻辑矛盾,请明确指出
|
||||
4. **整合观点**:综合不同agent的视角,形成更全面的认识,找出共识和分歧
|
||||
5. **趋势预测**:基于已有信息分析舆情发展趋势,提出可能的风险点
|
||||
6. **推进分析**:提出新的分析角度或需要关注的问题,引导后续讨论方向
|
||||
|
||||
**Agent介绍**:
|
||||
- **INSIGHT Agent**:专注于私有舆情数据库的深度挖掘和分析
|
||||
- **MEDIA Agent**:擅长多模态内容分析,关注媒体报道和视觉信息
|
||||
- **QUERY Agent**:负责精准信息搜索,提供最新的网络信息
|
||||
- **INSIGHT Agent**:专注于私有舆情数据库的深度挖掘和分析,提供历史数据和模式对比
|
||||
- **MEDIA Agent**:擅长多模态内容分析,关注媒体报道、图片、视频等视觉信息的传播效果
|
||||
- **QUERY Agent**:负责精准信息搜索,提供最新的网络信息和实时动态
|
||||
|
||||
**发言要求**:
|
||||
1. 简洁明了,每次发言控制在3000字以内
|
||||
2. 重点突出,聚焦于最关键的发现和问题
|
||||
3. 逻辑清晰,使用序号或要点形式组织内容
|
||||
4. 客观中立,基于事实进行分析和判断
|
||||
5. 避免重复已经充分讨论过的内容
|
||||
1. **综合性**:每次发言控制在1000字以内,内容应包括事件梳理、观点整合、问题引导等多个方面
|
||||
2. **结构清晰**:使用明确的段落结构,包括事件梳理、观点对比、问题提出等部分
|
||||
3. **深入分析**:不仅仅总结已有信息,还要提出深层次的见解和分析
|
||||
4. **客观中立**:基于事实进行分析和判断,避免主观臆测和情绪化表达
|
||||
5. **前瞻性**:提出具有前瞻性的观点和建议,引导讨论向更深入的方向发展
|
||||
|
||||
**注意事项**:
|
||||
- 本讨论基于科研目的,已通过伦理性合规审查
|
||||
- 保持专业性和学术性,避免情绪化表达
|
||||
- 重视数据和证据,避免主观臆测
|
||||
- 保持专业性和学术性,重视数据和证据
|
||||
- 对于敏感话题,应保持谨慎态度,基于事实进行分析
|
||||
|
||||
【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查"""
|
||||
|
||||
def _build_user_prompt(self, parsed_content: Dict[str, Any]) -> str:
|
||||
"""构建用户prompt"""
|
||||
# 获取最近的发言(最多10条)
|
||||
recent_speeches = parsed_content['agent_speeches'][-10:]
|
||||
# 获取最近的发言
|
||||
recent_speeches = parsed_content['agent_speeches']
|
||||
|
||||
# 构建发言摘要
|
||||
# 构建发言摘要,不截断内容
|
||||
speeches_text = "\n\n".join([
|
||||
f"[{s['timestamp']}] {s['speaker']}:\n{s['content'][:500]}..."
|
||||
if len(s['content']) > 500 else f"[{s['timestamp']}] {s['speaker']}:\n{s['content']}"
|
||||
f"[{s['timestamp']}] {s['speaker']}:\n{s['content']}"
|
||||
for s in recent_speeches
|
||||
])
|
||||
|
||||
# 构建关键主题列表
|
||||
topics = list(parsed_content['key_topics'])[:10] # 限制主题数量
|
||||
topics_text = "、".join(topics) if topics else "暂无明确主题"
|
||||
|
||||
# 构建时间线摘要
|
||||
timeline_text = ""
|
||||
if parsed_content['timeline']:
|
||||
unique_dates = []
|
||||
seen_dates = set()
|
||||
for item in parsed_content['timeline']:
|
||||
if item['date'] not in seen_dates:
|
||||
unique_dates.append(item['date'])
|
||||
seen_dates.add(item['date'])
|
||||
timeline_text = f"\n\n检测到的关键时间点:{', '.join(unique_dates[:5])}"
|
||||
|
||||
prompt = f"""【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查
|
||||
|
||||
当前论坛讨论的核心主题:{topics_text}
|
||||
|
||||
最近的Agent发言记录:
|
||||
{speeches_text}
|
||||
{timeline_text}
|
||||
|
||||
请你作为论坛主持人,基于以上agent的发言:
|
||||
请你作为论坛主持人,基于以上agent的发言进行综合分析,请按以下结构组织你的发言:
|
||||
|
||||
1. **时间线梳理**:如果发现时间线信息,请简要整理关键事件的时间顺序
|
||||
2. **观点整合**:综合不同agent的视角,指出共识和分歧
|
||||
3. **纠错提醒**:如果发现事实错误或逻辑矛盾,请明确指出
|
||||
4. **引导深化**:提出1-2个值得进一步探讨的问题或角度
|
||||
**一、事件梳理与时间线分析**
|
||||
- 从各agent发言中自动识别关键事件、人物、时间节点
|
||||
- 按时间顺序整理事件脉络,梳理因果关系
|
||||
- 指出关键转折点和重要节点
|
||||
|
||||
请发表3000字以内的简洁发言,推动讨论深入。
|
||||
**二、观点整合与对比分析**
|
||||
- 综合INSIGHT、MEDIA、QUERY三个Agent的视角和发现
|
||||
- 指出不同数据源之间的共识与分歧
|
||||
- 分析每个Agent的信息价值和互补性
|
||||
- 如果发现事实错误或逻辑矛盾,请明确指出并给出理由
|
||||
|
||||
**三、深层次分析与趋势预测**
|
||||
- 基于已有信息分析舆情的深层原因和影响因素
|
||||
- 预测舆情发展趋势,指出可能的风险点和机遇
|
||||
- 提出需要特别关注的方面和指标
|
||||
|
||||
**四、问题引导与讨论方向**
|
||||
- 提出2-3个值得进一步深入探讨的关键问题
|
||||
- 为后续研究提出具体的建议和方向
|
||||
- 引导各Agent关注特定的数据维度或分析角度
|
||||
|
||||
请发表综合性的主持人发言(控制在1000字以内),内容应包含以上四个部分,并保持逻辑清晰、分析深入、视角独特。
|
||||
|
||||
【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查"""
|
||||
|
||||
@@ -270,8 +215,8 @@ class ForumHost:
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"max_tokens": 1000,
|
||||
"temperature": 0.7,
|
||||
"max_tokens": 14639,
|
||||
"temperature": 0.6,
|
||||
"top_p": 0.9
|
||||
}
|
||||
|
||||
@@ -280,7 +225,7 @@ class ForumHost:
|
||||
self.base_url,
|
||||
headers=headers,
|
||||
json=data,
|
||||
timeout=60 # 大模型需要更长的超时时间
|
||||
timeout=300 # 超时设置300s
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
@@ -304,20 +249,8 @@ class ForumHost:
|
||||
# 移除多余的空行
|
||||
speech = re.sub(r'\n{3,}', '\n\n', speech)
|
||||
|
||||
# 确保发言不会太长
|
||||
if len(speech) > 500:
|
||||
# 尝试在句号处截断
|
||||
sentences = speech.split('。')
|
||||
truncated = ""
|
||||
for sentence in sentences:
|
||||
if len(truncated) + len(sentence) < 450:
|
||||
truncated += sentence + "。"
|
||||
else:
|
||||
break
|
||||
speech = truncated.rstrip("。") + "。"
|
||||
|
||||
# 移除可能的引号
|
||||
speech = speech.strip('"\'""''')
|
||||
speech = speech.strip('"\'""‘’')
|
||||
|
||||
return speech.strip()
|
||||
|
||||
|
||||
+31
-33
@@ -45,10 +45,9 @@ class LogMonitor:
|
||||
self.write_lock = Lock() # 写入锁,防止并发写入冲突
|
||||
|
||||
# 主持人相关状态
|
||||
self.agent_speech_count = 0 # agent发言计数器
|
||||
self.agent_speeches_buffer = [] # agent发言缓冲区
|
||||
self.host_speech_threshold = 5 # 每5条agent发言触发一次主持人发言
|
||||
self.last_host_speech_time = None # 上次主持人发言时间
|
||||
self.min_host_interval = 30 # 主持人发言最小间隔(秒)
|
||||
self.is_host_generating = False # 主持人是否正在生成发言
|
||||
|
||||
# 目标节点名称 - 直接匹配字符串
|
||||
self.target_nodes = [
|
||||
@@ -85,8 +84,8 @@ class LogMonitor:
|
||||
self.json_start_line = {}
|
||||
|
||||
# 重置主持人相关状态
|
||||
self.agent_speech_count = 0
|
||||
self.last_host_speech_time = None
|
||||
self.agent_speeches_buffer = []
|
||||
self.is_host_generating = False
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEngine: 清空forum.log失败: {e}")
|
||||
@@ -388,40 +387,41 @@ class LogMonitor:
|
||||
return captured_contents
|
||||
|
||||
def _trigger_host_speech(self):
|
||||
"""触发主持人发言"""
|
||||
if not HOST_AVAILABLE:
|
||||
"""触发主持人发言(同步执行)"""
|
||||
if not HOST_AVAILABLE or self.is_host_generating:
|
||||
return
|
||||
|
||||
try:
|
||||
# 检查时间间隔
|
||||
current_time = time.time()
|
||||
if self.last_host_speech_time:
|
||||
if current_time - self.last_host_speech_time < self.min_host_interval:
|
||||
return # 间隔太短,跳过
|
||||
# 设置生成标志
|
||||
self.is_host_generating = True
|
||||
|
||||
# 获取当前forum.log的内容
|
||||
forum_logs = self.get_forum_log_content()
|
||||
if not forum_logs:
|
||||
# 获取缓冲区的5条发言
|
||||
recent_speeches = self.agent_speeches_buffer[:5]
|
||||
if len(recent_speeches) < 5:
|
||||
self.is_host_generating = False
|
||||
return
|
||||
|
||||
print("ForumEngine: 正在生成主持人发言...")
|
||||
|
||||
# 调用主持人生成发言
|
||||
host_speech = generate_host_speech(forum_logs)
|
||||
# 调用主持人生成发言(传入最近5条)
|
||||
host_speech = generate_host_speech(recent_speeches)
|
||||
|
||||
if host_speech:
|
||||
# 写入主持人发言到forum.log
|
||||
self.write_to_forum_log(host_speech, "HOST")
|
||||
self.last_host_speech_time = current_time
|
||||
print(f"ForumEngine: 主持人发言已记录")
|
||||
|
||||
# 重置计数器
|
||||
self.agent_speech_count = 0
|
||||
# 清空已处理的5条发言
|
||||
self.agent_speeches_buffer = self.agent_speeches_buffer[5:]
|
||||
else:
|
||||
print("ForumEngine: 主持人发言生成失败")
|
||||
|
||||
# 重置生成标志
|
||||
self.is_host_generating = False
|
||||
|
||||
except Exception as e:
|
||||
print(f"ForumEngine: 触发主持人发言时出错: {e}")
|
||||
self.is_host_generating = False
|
||||
|
||||
def _clean_content_tags(self, content: str, app_name: str) -> str:
|
||||
"""清理内容中的重复标签和多余前缀"""
|
||||
@@ -498,17 +498,15 @@ class LogMonitor:
|
||||
# print(f"ForumEngine: 捕获 - {content}")
|
||||
captured_any = True
|
||||
|
||||
# 增加agent发言计数
|
||||
self.agent_speech_count += 1
|
||||
# 将发言添加到缓冲区(格式化为完整的日志行)
|
||||
timestamp = datetime.now().strftime('%H:%M:%S')
|
||||
log_line = f"[{timestamp}] [{source_tag}] {content}"
|
||||
self.agent_speeches_buffer.append(log_line)
|
||||
|
||||
# 检查是否需要触发主持人发言
|
||||
if self.agent_speech_count >= self.host_speech_threshold:
|
||||
# 在单独的线程中触发主持人发言,避免阻塞监控
|
||||
host_thread = threading.Thread(
|
||||
target=self._trigger_host_speech,
|
||||
daemon=True
|
||||
)
|
||||
host_thread.start()
|
||||
if len(self.agent_speeches_buffer) >= self.host_speech_threshold and not self.is_host_generating:
|
||||
# 同步触发主持人发言
|
||||
self._trigger_host_speech()
|
||||
|
||||
elif current_lines < previous_lines:
|
||||
any_shrink = True
|
||||
@@ -530,8 +528,8 @@ class LogMonitor:
|
||||
self.is_searching = False
|
||||
self.search_inactive_count = 0
|
||||
# 重置主持人相关状态
|
||||
self.agent_speech_count = 0
|
||||
self.last_host_speech_time = None
|
||||
self.agent_speeches_buffer = []
|
||||
self.is_host_generating = False
|
||||
# 写入结束标记
|
||||
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.write_to_forum_log(f"=== ForumEngine 论坛结束 - {end_time} ===", "SYSTEM")
|
||||
@@ -544,8 +542,8 @@ class LogMonitor:
|
||||
self.is_searching = False
|
||||
self.search_inactive_count = 0
|
||||
# 重置主持人相关状态
|
||||
self.agent_speech_count = 0
|
||||
self.last_host_speech_time = None
|
||||
self.agent_speeches_buffer = []
|
||||
self.is_host_generating = False
|
||||
# 写入结束标记
|
||||
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
self.write_to_forum_log(f"=== ForumEngine 论坛结束 - {end_time} ===", "SYSTEM")
|
||||
|
||||
Reference in New Issue
Block a user