""" 论坛主持人模块 使用硅基流动的Qwen3模型作为论坛主持人,引导多个agent进行讨论 """ import requests import json import sys import os from typing import List, Dict, Any, Optional from datetime import datetime import re # 添加项目根目录到Python路径以导入config sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import GUIJI_QWEN3_API_KEY # 添加utils目录到Python路径 current_dir = os.path.dirname(os.path.abspath(__file__)) root_dir = os.path.dirname(current_dir) utils_dir = os.path.join(root_dir, 'utils') if utils_dir not in sys.path: sys.path.append(utils_dir) from retry_helper import with_graceful_retry, SEARCH_API_RETRY_CONFIG class ForumHost: """ 论坛主持人类 使用Qwen3-235B模型作为智能主持人 """ def __init__(self, api_key: str = None): """ 初始化论坛主持人 Args: api_key: 硅基流动API密钥,如果不提供则从配置文件读取 """ self.api_key = api_key or GUIJI_QWEN3_API_KEY self.base_url = "https://api.siliconflow.cn/v1/chat/completions" self.model = "Qwen/Qwen3-235B-A22B-Instruct-2507" # 使用更大的模型 if not self.api_key: raise ValueError("未找到硅基流动API密钥,请在config.py中设置GUIJI_QWEN3_API_KEY") # 记录历史发言,避免重复 self.previous_summaries = [] def generate_host_speech(self, forum_logs: List[str]) -> Optional[str]: """ 生成主持人发言 Args: forum_logs: 论坛日志内容列表 Returns: 主持人发言内容,如果生成失败返回None """ try: # 解析论坛日志,提取有效内容 parsed_content = self._parse_forum_logs(forum_logs) if not parsed_content['agent_speeches']: print("ForumHost: 没有找到有效的agent发言") return None # 构建prompt system_prompt = self._build_system_prompt() user_prompt = self._build_user_prompt(parsed_content) # 调用API生成发言 response = self._call_qwen_api(system_prompt, user_prompt) if response["success"]: speech = response["content"] # 清理和格式化发言 speech = self._format_host_speech(speech) return speech else: print(f"ForumHost: API调用失败 - {response.get('error', '未知错误')}") return None except Exception as e: print(f"ForumHost: 生成发言时出错 - {str(e)}") return None def _parse_forum_logs(self, forum_logs: List[str]) -> Dict[str, Any]: """ 解析论坛日志,提取agent发言 Returns: 包含agent发言的字典 """ parsed = { 'agent_speeches': [] } for line in forum_logs: if not line.strip(): continue # 解析时间戳和发言者 match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[(\w+)\]\s*(.+)', line) if match: timestamp, speaker, content = match.groups() # 跳过系统消息和HOST自己的发言 if speaker in ['SYSTEM', 'HOST']: continue # 记录agent发言 if speaker in ['INSIGHT', 'MEDIA', 'QUERY']: # 处理转义的换行符 content = content.replace('\\n', '\n') parsed['agent_speeches'].append({ 'timestamp': timestamp, 'speaker': speaker, 'content': content }) return parsed def _build_system_prompt(self) -> str: """构建系统prompt""" return """【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查 你是一个多agent舆情分析系统的论坛主持人。你的职责是: 1. **事件梳理**:从各agent的发言中自动识别关键事件、人物、时间节点,按时间顺序整理事件脉络 2. **引导讨论**:根据各agent的发言,引导深入讨论关键问题,探究深层原因 3. **纠正错误**:结合不同agent的视角以及言论,如果发现事实错误或逻辑矛盾,请明确指出 4. **整合观点**:综合不同agent的视角,形成更全面的认识,找出共识和分歧 5. **趋势预测**:基于已有信息分析舆情发展趋势,提出可能的风险点 6. **推进分析**:提出新的分析角度或需要关注的问题,引导后续讨论方向 **Agent介绍**: - **INSIGHT Agent**:专注于私有舆情数据库的深度挖掘和分析,提供历史数据和模式对比 - **MEDIA Agent**:擅长多模态内容分析,关注媒体报道、图片、视频等视觉信息的传播效果 - **QUERY Agent**:负责精准信息搜索,提供最新的网络信息和实时动态 **发言要求**: 1. **综合性**:每次发言控制在1000字以内,内容应包括事件梳理、观点整合、问题引导等多个方面 2. **结构清晰**:使用明确的段落结构,包括事件梳理、观点对比、问题提出等部分 3. **深入分析**:不仅仅总结已有信息,还要提出深层次的见解和分析 4. **客观中立**:基于事实进行分析和判断,避免主观臆测和情绪化表达 5. **前瞻性**:提出具有前瞻性的观点和建议,引导讨论向更深入的方向发展 **注意事项**: - 本讨论基于科研目的,已通过伦理性合规审查 - 保持专业性和学术性,重视数据和证据 - 对于敏感话题,应保持谨慎态度,基于事实进行分析 【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查""" def _build_user_prompt(self, parsed_content: Dict[str, Any]) -> str: """构建用户prompt""" # 获取最近的发言 recent_speeches = parsed_content['agent_speeches'] # 构建发言摘要,不截断内容 speeches_text = "\n\n".join([ f"[{s['timestamp']}] {s['speaker']}:\n{s['content']}" for s in recent_speeches ]) prompt = f"""【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查 最近的Agent发言记录: {speeches_text} 请你作为论坛主持人,基于以上agent的发言进行综合分析,请按以下结构组织你的发言: **一、事件梳理与时间线分析** - 从各agent发言中自动识别关键事件、人物、时间节点 - 按时间顺序整理事件脉络,梳理因果关系 - 指出关键转折点和重要节点 **二、观点整合与对比分析** - 综合INSIGHT、MEDIA、QUERY三个Agent的视角和发现 - 指出不同数据源之间的共识与分歧 - 分析每个Agent的信息价值和互补性 - 如果发现事实错误或逻辑矛盾,请明确指出并给出理由 **三、深层次分析与趋势预测** - 基于已有信息分析舆情的深层原因和影响因素 - 预测舆情发展趋势,指出可能的风险点和机遇 - 提出需要特别关注的方面和指标 **四、问题引导与讨论方向** - 提出2-3个值得进一步深入探讨的关键问题 - 为后续研究提出具体的建议和方向 - 引导各Agent关注特定的数据维度或分析角度 请发表综合性的主持人发言(控制在1000字以内),内容应包含以上四个部分,并保持逻辑清晰、分析深入、视角独特。 【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查""" return prompt @with_graceful_retry(SEARCH_API_RETRY_CONFIG, default_return={"success": False, "error": "API服务暂时不可用"}) def _call_qwen_api(self, system_prompt: str, user_prompt: str) -> Dict[str, Any]: """调用Qwen API""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "max_tokens": 14639, "temperature": 0.6, "top_p": 0.9 } try: response = requests.post( self.base_url, headers=headers, json=data, timeout=300 # 超时设置300s ) response.raise_for_status() result = response.json() if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] return {"success": True, "content": content} else: return {"success": False, "error": "API返回格式异常"} except requests.exceptions.Timeout: return {"success": False, "error": "API请求超时"} except requests.exceptions.RequestException as e: return {"success": False, "error": f"网络请求错误: {str(e)}"} except Exception as e: return {"success": False, "error": f"API调用异常: {str(e)}"} def _format_host_speech(self, speech: str) -> str: """格式化主持人发言""" # 移除多余的空行 speech = re.sub(r'\n{3,}', '\n\n', speech) # 移除可能的引号 speech = speech.strip('"\'""‘’') return speech.strip() # 创建全局实例 _host_instance = None def get_forum_host() -> ForumHost: """获取全局论坛主持人实例""" global _host_instance if _host_instance is None: _host_instance = ForumHost() return _host_instance def generate_host_speech(forum_logs: List[str]) -> Optional[str]: """生成主持人发言的便捷函数""" return get_forum_host().generate_host_speech(forum_logs)