From cec8ed3826779f266f66f38a7ceb3877b00365ca Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Tue, 16 Sep 2025 22:09:19 +0800 Subject: [PATCH] Improve forum communication mechanism between agents. --- ForumEngine/monitor.py | 7 +- InsightEngine/nodes/summary_node.py | 57 +++++++++- MediaEngine/nodes/summary_node.py | 57 +++++++++- QueryEngine/nodes/summary_node.py | 57 +++++++++- README-EN.md | 24 +++-- README.md | 24 +++-- config.py | 2 +- utils/forum_reader.py | 162 ++++++++++++++++++++++++++++ 8 files changed, 356 insertions(+), 34 deletions(-) create mode 100644 utils/forum_reader.py diff --git a/ForumEngine/monitor.py b/ForumEngine/monitor.py index 0f7bd01..8afbe6b 100644 --- a/ForumEngine/monitor.py +++ b/ForumEngine/monitor.py @@ -131,7 +131,12 @@ class LogMonitor: "已更新段落", "正在生成", "开始处理", - "处理完成" + "处理完成", + "已读取HOST发言", + "读取HOST发言失败", + "未找到HOST发言", + "调试输出", + "信息记录" ] for pattern in exclude_patterns: diff --git a/InsightEngine/nodes/summary_node.py b/InsightEngine/nodes/summary_node.py index 9a58213..df72b20 100644 --- a/InsightEngine/nodes/summary_node.py +++ b/InsightEngine/nodes/summary_node.py @@ -18,6 +18,17 @@ from ..utils.text_processing import ( format_search_results_for_prompt ) +# 导入论坛读取工具 +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +try: + from utils.forum_reader import get_latest_host_speech, format_host_speech_for_prompt + FORUM_READER_AVAILABLE = True +except ImportError: + FORUM_READER_AVAILABLE = False + print("警告: 无法导入forum_reader模块,将跳过HOST发言读取功能") + class FirstSummaryNode(StateMutationNode): """根据搜索结果生成段落首次总结的节点""" @@ -62,9 +73,28 @@ class FirstSummaryNode(StateMutationNode): # 准备输入数据 if isinstance(input_data, str): - message = input_data + data = json.loads(input_data) else: - message = json.dumps(input_data, ensure_ascii=False) + data = input_data.copy() if isinstance(input_data, dict) else input_data + + # 读取最新的HOST发言(如果可用) + if FORUM_READER_AVAILABLE: + try: + host_speech = get_latest_host_speech() + if host_speech: + # 将HOST发言添加到输入数据中 + data['host_speech'] = host_speech + self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符") + except Exception as e: + self.log_info(f"读取HOST发言失败: {str(e)}") + + # 转换为JSON字符串 + message = json.dumps(data, ensure_ascii=False) + + # 如果有HOST发言,添加到消息前面作为参考 + if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']: + formatted_host = format_host_speech_for_prompt(data['host_speech']) + message = formatted_host + "\n" + message self.log_info("正在生成首次段落总结") @@ -208,9 +238,28 @@ class ReflectionSummaryNode(StateMutationNode): # 准备输入数据 if isinstance(input_data, str): - message = input_data + data = json.loads(input_data) else: - message = json.dumps(input_data, ensure_ascii=False) + data = input_data.copy() if isinstance(input_data, dict) else input_data + + # 读取最新的HOST发言(如果可用) + if FORUM_READER_AVAILABLE: + try: + host_speech = get_latest_host_speech() + if host_speech: + # 将HOST发言添加到输入数据中 + data['host_speech'] = host_speech + self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符") + except Exception as e: + self.log_info(f"读取HOST发言失败: {str(e)}") + + # 转换为JSON字符串 + message = json.dumps(data, ensure_ascii=False) + + # 如果有HOST发言,添加到消息前面作为参考 + if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']: + formatted_host = format_host_speech_for_prompt(data['host_speech']) + message = formatted_host + "\n" + message self.log_info("正在生成反思总结") diff --git a/MediaEngine/nodes/summary_node.py b/MediaEngine/nodes/summary_node.py index 39ef7a1..a26d948 100644 --- a/MediaEngine/nodes/summary_node.py +++ b/MediaEngine/nodes/summary_node.py @@ -18,6 +18,17 @@ from ..utils.text_processing import ( format_search_results_for_prompt ) +# 导入论坛读取工具 +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +try: + from utils.forum_reader import get_latest_host_speech, format_host_speech_for_prompt + FORUM_READER_AVAILABLE = True +except ImportError: + FORUM_READER_AVAILABLE = False + print("警告: 无法导入forum_reader模块,将跳过HOST发言读取功能") + class FirstSummaryNode(StateMutationNode): """根据搜索结果生成段落首次总结的节点""" @@ -62,9 +73,28 @@ class FirstSummaryNode(StateMutationNode): # 准备输入数据 if isinstance(input_data, str): - message = input_data + data = json.loads(input_data) else: - message = json.dumps(input_data, ensure_ascii=False) + data = input_data.copy() if isinstance(input_data, dict) else input_data + + # 读取最新的HOST发言(如果可用) + if FORUM_READER_AVAILABLE: + try: + host_speech = get_latest_host_speech() + if host_speech: + # 将HOST发言添加到输入数据中 + data['host_speech'] = host_speech + self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符") + except Exception as e: + self.log_info(f"读取HOST发言失败: {str(e)}") + + # 转换为JSON字符串 + message = json.dumps(data, ensure_ascii=False) + + # 如果有HOST发言,添加到消息前面作为参考 + if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']: + formatted_host = format_host_speech_for_prompt(data['host_speech']) + message = formatted_host + "\n" + message self.log_info("正在生成首次段落总结") @@ -212,9 +242,28 @@ class ReflectionSummaryNode(StateMutationNode): # 准备输入数据 if isinstance(input_data, str): - message = input_data + data = json.loads(input_data) else: - message = json.dumps(input_data, ensure_ascii=False) + data = input_data.copy() if isinstance(input_data, dict) else input_data + + # 读取最新的HOST发言(如果可用) + if FORUM_READER_AVAILABLE: + try: + host_speech = get_latest_host_speech() + if host_speech: + # 将HOST发言添加到输入数据中 + data['host_speech'] = host_speech + self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符") + except Exception as e: + self.log_info(f"读取HOST发言失败: {str(e)}") + + # 转换为JSON字符串 + message = json.dumps(data, ensure_ascii=False) + + # 如果有HOST发言,添加到消息前面作为参考 + if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']: + formatted_host = format_host_speech_for_prompt(data['host_speech']) + message = formatted_host + "\n" + message self.log_info("正在生成反思总结") diff --git a/QueryEngine/nodes/summary_node.py b/QueryEngine/nodes/summary_node.py index ba50c14..ae8f911 100644 --- a/QueryEngine/nodes/summary_node.py +++ b/QueryEngine/nodes/summary_node.py @@ -18,6 +18,17 @@ from ..utils.text_processing import ( format_search_results_for_prompt ) +# 导入论坛读取工具 +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +try: + from utils.forum_reader import get_latest_host_speech, format_host_speech_for_prompt + FORUM_READER_AVAILABLE = True +except ImportError: + FORUM_READER_AVAILABLE = False + print("警告: 无法导入forum_reader模块,将跳过HOST发言读取功能") + class FirstSummaryNode(StateMutationNode): """根据搜索结果生成段落首次总结的节点""" @@ -62,9 +73,28 @@ class FirstSummaryNode(StateMutationNode): # 准备输入数据 if isinstance(input_data, str): - message = input_data + data = json.loads(input_data) else: - message = json.dumps(input_data, ensure_ascii=False) + data = input_data.copy() if isinstance(input_data, dict) else input_data + + # 读取最新的HOST发言(如果可用) + if FORUM_READER_AVAILABLE: + try: + host_speech = get_latest_host_speech() + if host_speech: + # 将HOST发言添加到输入数据中 + data['host_speech'] = host_speech + self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符") + except Exception as e: + self.log_info(f"读取HOST发言失败: {str(e)}") + + # 转换为JSON字符串 + message = json.dumps(data, ensure_ascii=False) + + # 如果有HOST发言,添加到消息前面作为参考 + if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']: + formatted_host = format_host_speech_for_prompt(data['host_speech']) + message = formatted_host + "\n" + message self.log_info("正在生成首次段落总结") @@ -212,9 +242,28 @@ class ReflectionSummaryNode(StateMutationNode): # 准备输入数据 if isinstance(input_data, str): - message = input_data + data = json.loads(input_data) else: - message = json.dumps(input_data, ensure_ascii=False) + data = input_data.copy() if isinstance(input_data, dict) else input_data + + # 读取最新的HOST发言(如果可用) + if FORUM_READER_AVAILABLE: + try: + host_speech = get_latest_host_speech() + if host_speech: + # 将HOST发言添加到输入数据中 + data['host_speech'] = host_speech + self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符") + except Exception as e: + self.log_info(f"读取HOST发言失败: {str(e)}") + + # 转换为JSON字符串 + message = json.dumps(data, ensure_ascii=False) + + # 如果有HOST发言,添加到消息前面作为参考 + if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']: + formatted_host = format_host_speech_for_prompt(data['host_speech']) + message = formatted_host + "\n" + message self.log_info("正在生成反思总结") diff --git a/README-EN.md b/README-EN.md index 7e720cc..c63da49 100644 --- a/README-EN.md +++ b/README-EN.md @@ -58,16 +58,18 @@ Say goodbye to traditional data dashboards. In "WeiYu", everything starts with a ### A Complete Analysis Workflow -| Step | Phase Name | Main Operations | Participating Components | -|------|------------|-----------------|-------------------------| -| 1 | User Query | Flask main application receives the query | Flask Main Application | -| 2 | Parallel Launch | Three Agents start working simultaneously | Query Agent, Media Agent, Insight Agent | -| 3 | Preliminary Analysis | Each Agent uses dedicated tools for overview search | Each Agent + Dedicated Toolsets | -| 4 | Strategy Formulation | Develop segmented research strategies based on preliminary results | Internal Decision Modules of Each Agent | -| 5 | In-depth Research | Multi-round search and reflection mechanisms calling respective tools | Each Agent + Reflection Mechanisms | -| 6 | Forum Collaboration | ForumEngine accepts key findings from each Agent and facilitates Agent communication | ForumEngine + All Agents | -| 7 | Result Integration | Report Agent collects all analysis results and forum content | Report Agent | -| 8 | Report Generation | Dynamically select templates and styles, generate final reports through multiple rounds | Report Agent + Template Engine | +| Step | Phase Name | Main Operations | Participating Components | Cycle Nature | +|------|------------|-----------------|-------------------------|--------------| +| 1 | User Query | Flask main application receives the query | Flask Main Application | - | +| 2 | Parallel Launch | Three Agents start working simultaneously | Query Agent, Media Agent, Insight Agent | - | +| 3 | Preliminary Analysis | Each Agent uses dedicated tools for overview search | Each Agent + Dedicated Toolsets | - | +| 4 | Strategy Formulation | Develop segmented research strategies based on preliminary results | Internal Decision Modules of Each Agent | - | +| 5-N | **Iterative Phase** | **Forum Collaboration + In-depth Research** | **ForumEngine + All Agents** | **Multi-round cycles** | +| 5.1 | In-depth Research | Each Agent conducts specialized search guided by forum host | Each Agent + Reflection Mechanisms + Forum Guidance | Each cycle | +| 5.2 | Forum Collaboration | ForumEngine monitors Agent communications and generates host summaries | ForumEngine + LLM Host | Each cycle | +| 5.3 | Communication Integration | Each Agent adjusts research directions based on discussions | Each Agent + forum_reader tool | Each cycle | +| N+1 | Result Integration | Report Agent collects all analysis results and forum content | Report Agent | - | +| N+2 | Report Generation | Dynamically select templates and styles, generate final reports through multiple rounds | Report Agent + Template Engine | - | ### Project Code Structure Tree @@ -161,6 +163,8 @@ Weibo_PublicOpinion_AnalysisSystem/ ├── logs/ # Runtime log directory ├── final_reports/ # Final generated HTML report files ├── utils/ # Common utility functions +│ ├── forum_reader.py # Agent forum communication +│ └── retry_helper.py # Network request retry mechanism tool ├── app.py # Flask main application entry ├── config.py # Global configuration file └── requirements.txt # Python dependency list diff --git a/README.md b/README.md index 289ecda..fefe02a 100644 --- a/README.md +++ b/README.md @@ -58,16 +58,18 @@ ### 一次完整分析流程 -| 步骤 | 阶段名称 | 主要操作 | 参与组件 | -|------|----------|----------|----------| -| 1 | 用户提问 | Flask主应用接收查询 | Flask主应用 | -| 2 | 并行启动 | 三个Agent同时开始工作 | Query Agent、Media Agent、Insight Agent | -| 3 | 初步分析 | 各Agent使用专属工具进行概览搜索 | 各Agent + 专属工具集 | -| 4 | 策略制定 | 基于初步结果制定分块研究策略 | 各Agent内部决策模块 | -| 5 | 深度研究 | 多轮搜索与反思机制调用各自工具 | 各Agent + 反思机制 | -| 6 | 论坛协作 | ForumEngine接受各Agent关键发现并促进Agent交流 | ForumEngine + 所有Agent | -| 7 | 结果整合 | Report Agent收集所有分析结果和论坛内容 | Report Agent | -| 8 | 报告生成 | 动态选择模板和样式,多轮生成最终报告 | Report Agent + 模板引擎 | +| 步骤 | 阶段名称 | 主要操作 | 参与组件 | 循环特性 | +|------|----------|----------|----------|----------| +| 1 | 用户提问 | Flask主应用接收查询 | Flask主应用 | - | +| 2 | 并行启动 | 三个Agent同时开始工作 | Query Agent、Media Agent、Insight Agent | - | +| 3 | 初步分析 | 各Agent使用专属工具进行概览搜索 | 各Agent + 专属工具集 | - | +| 4 | 策略制定 | 基于初步结果制定分块研究策略 | 各Agent内部决策模块 | - | +| 5-N | **循环阶段** | **论坛协作 + 深度研究** | **ForumEngine + 所有Agent** | **多轮循环** | +| 5.1 | 深度研究 | 各Agent基于论坛主持人引导进行专项搜索 | 各Agent + 反思机制 + 论坛引导 | 每轮循环 | +| 5.2 | 论坛协作 | ForumEngine监控Agent发言并生成主持人总结 | ForumEngine + LLM主持人 | 每轮循环 | +| 5.3 | 交流融合 | 各Agent根据讨论调整研究方向 | 各Agent + forum_reader工具 | 每轮循环 | +| N+1 | 结果整合 | Report Agent收集所有分析结果和论坛内容 | Report Agent | - | +| N+2 | 报告生成 | 动态选择模板和样式,多轮生成最终报告 | Report Agent + 模板引擎 | - | ### 项目代码结构树 @@ -161,6 +163,8 @@ Weibo_PublicOpinion_AnalysisSystem/ ├── logs/ # 运行日志目录 ├── final_reports/ # 最终生成的HTML报告文件 ├── utils/ # 通用工具函数 +│ ├── forum_reader.py # Agent间论坛通信 +│ └── retry_helper.py # 网络请求重试机制工具 ├── app.py # Flask主应用入口 ├── config.py # 全局配置文件 └── requirements.txt # Python依赖包清单 diff --git a/config.py b/config.py index 7568250..8f6a311 100644 --- a/config.py +++ b/config.py @@ -26,7 +26,7 @@ TAVILY_API_KEY = "your_tavily_api_key" KIMI_API_KEY = "your_kimi_api_key" # Gemini API Key (via OpenAI format proxy) -# 申请地址https://api.chataiapi.com/ +# 这里我用了一个中转api来接入Gemini,申请地址https://api.chataiapi.com/,你也可以使用其他 GEMINI_API_KEY = "your_gemini_api_key" # Bocha Search API Key diff --git a/utils/forum_reader.py b/utils/forum_reader.py new file mode 100644 index 0000000..8943988 --- /dev/null +++ b/utils/forum_reader.py @@ -0,0 +1,162 @@ +""" +Forum日志读取工具 +用于读取forum.log中的最新HOST发言 +""" + +import re +from pathlib import Path +from typing import Optional, List, Dict +import logging + +logger = logging.getLogger(__name__) + + +def get_latest_host_speech(log_dir: str = "logs") -> Optional[str]: + """ + 获取forum.log中最新的HOST发言 + + Args: + log_dir: 日志目录路径 + + Returns: + 最新的HOST发言内容,如果没有则返回None + """ + try: + forum_log_path = Path(log_dir) / "forum.log" + + if not forum_log_path.exists(): + logger.debug("forum.log文件不存在") + return None + + with open(forum_log_path, 'r', encoding='utf-8', errors='ignore') as f: + lines = f.readlines() + + # 从后往前查找最新的HOST发言 + host_speech = None + for line in reversed(lines): + # 匹配格式: [时间] [HOST] 内容 + match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[HOST\]\s*(.+)', line) + if match: + _, content = match.groups() + # 处理转义的换行符,还原为实际换行 + host_speech = content.replace('\\n', '\n').strip() + break + + if host_speech: + logger.info(f"找到最新的HOST发言,长度: {len(host_speech)}字符") + else: + logger.debug("未找到HOST发言") + + return host_speech + + except Exception as e: + logger.error(f"读取forum.log失败: {str(e)}") + return None + + +def get_all_host_speeches(log_dir: str = "logs") -> List[Dict[str, str]]: + """ + 获取forum.log中所有的HOST发言 + + Args: + log_dir: 日志目录路径 + + Returns: + 包含所有HOST发言的列表,每个元素是包含timestamp和content的字典 + """ + try: + forum_log_path = Path(log_dir) / "forum.log" + + if not forum_log_path.exists(): + logger.debug("forum.log文件不存在") + return [] + + with open(forum_log_path, 'r', encoding='utf-8', errors='ignore') as f: + lines = f.readlines() + + host_speeches = [] + for line in lines: + # 匹配格式: [时间] [HOST] 内容 + match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[HOST\]\s*(.+)', line) + if match: + timestamp, content = match.groups() + # 处理转义的换行符 + content = content.replace('\\n', '\n').strip() + host_speeches.append({ + 'timestamp': timestamp, + 'content': content + }) + + logger.info(f"找到{len(host_speeches)}条HOST发言") + return host_speeches + + except Exception as e: + logger.error(f"读取forum.log失败: {str(e)}") + return [] + + +def get_recent_agent_speeches(log_dir: str = "logs", limit: int = 5) -> List[Dict[str, str]]: + """ + 获取forum.log中最近的Agent发言(不包括HOST) + + Args: + log_dir: 日志目录路径 + limit: 返回的最大发言数量 + + Returns: + 包含最近Agent发言的列表 + """ + try: + forum_log_path = Path(log_dir) / "forum.log" + + if not forum_log_path.exists(): + return [] + + with open(forum_log_path, 'r', encoding='utf-8', errors='ignore') as f: + lines = f.readlines() + + agent_speeches = [] + for line in reversed(lines): # 从后往前读取 + # 匹配格式: [时间] [AGENT_NAME] 内容 + match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[(INSIGHT|MEDIA|QUERY)\]\s*(.+)', line) + if match: + timestamp, agent, content = match.groups() + # 处理转义的换行符 + content = content.replace('\\n', '\n').strip() + agent_speeches.append({ + 'timestamp': timestamp, + 'agent': agent, + 'content': content + }) + if len(agent_speeches) >= limit: + break + + agent_speeches.reverse() # 恢复时间顺序 + return agent_speeches + + except Exception as e: + logger.error(f"读取forum.log失败: {str(e)}") + return [] + + +def format_host_speech_for_prompt(host_speech: str) -> str: + """ + 格式化HOST发言,用于添加到prompt中 + + Args: + host_speech: HOST发言内容 + + Returns: + 格式化后的内容 + """ + if not host_speech: + return "" + + return f""" +### 论坛主持人最新总结 +以下是论坛主持人对各Agent讨论的最新总结和引导,请参考其中的观点和建议: + +{host_speech} + +--- +"""