diff --git a/ForumEgine/__init__.py b/ForumEgine/__init__.py new file mode 100644 index 0000000..53df215 --- /dev/null +++ b/ForumEgine/__init__.py @@ -0,0 +1,7 @@ +""" +ForumEgine - 监控和记录三个Engine的SummaryNode和ReportFormattingNode输出 +""" + +from .monitor import LogMonitor + +__all__ = ['LogMonitor'] diff --git a/ForumEgine/monitor.py b/ForumEgine/monitor.py new file mode 100644 index 0000000..9553a32 --- /dev/null +++ b/ForumEgine/monitor.py @@ -0,0 +1,310 @@ +""" +日志监控器 - 实时监控三个log文件中的SummaryNode和ReportFormattingNode输出 +""" + +import os +import time +import threading +from pathlib import Path +from datetime import datetime +import re +from typing import Dict, Optional, List +from threading import Lock + +class LogMonitor: + """基于文件变化的智能日志监控器""" + + def __init__(self, log_dir: str = "logs"): + """初始化日志监控器""" + self.log_dir = Path(log_dir) + self.forum_log_file = self.log_dir / "forum.log" + + # 要监控的日志文件 + self.monitored_logs = { + 'insight': self.log_dir / 'insight.log', + 'media': self.log_dir / 'media.log', + 'query': self.log_dir / 'query.log' + } + + # 监控状态 + self.is_monitoring = False + self.monitor_thread = None + self.file_positions = {} # 记录每个文件的读取位置 + self.file_line_counts = {} # 记录每个文件的行数 + self.is_searching = False # 是否正在搜索 + self.search_inactive_count = 0 # 搜索非活跃计数器 + self.write_lock = Lock() # 写入锁,防止并发写入冲突 + + # 目标节点名称 - 直接匹配字符串 + self.target_nodes = [ + 'FirstSummaryNode', + 'ReflectionSummaryNode', + 'ReportFormattingNode' + ] + + # 确保logs目录存在 + self.log_dir.mkdir(exist_ok=True) + + def clear_forum_log(self): + """清空forum.log文件""" + try: + if self.forum_log_file.exists(): + self.forum_log_file.unlink() + + # 创建新的forum.log文件并写入开始标记 + with open(self.forum_log_file, 'w', encoding='utf-8') as f: + start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + f.write(f"=== ForumEgine 监控开始 - {start_time} ===\n") + + print(f"ForumEgine: forum.log 已清空并初始化") + + except Exception as e: + print(f"ForumEgine: 清空forum.log失败: {e}") + + def write_to_forum_log(self, content: str): + """写入内容到forum.log(线程安全)""" + try: + with self.write_lock: # 使用锁确保线程安全 + with open(self.forum_log_file, 'a', encoding='utf-8') as f: + timestamp = datetime.now().strftime('%H:%M:%S') + f.write(f"[{timestamp}] {content}\n") + f.flush() + except Exception as e: + print(f"ForumEgine: 写入forum.log失败: {e}") + + def is_target_log_line(self, line: str) -> bool: + """检查是否是目标日志行(SummaryNode或ReportFormattingNode)""" + # 简单字符串包含检查,更可靠 + for node_name in self.target_nodes: + if node_name in line: + return True + return False + + def extract_node_content(self, line: str) -> Optional[str]: + """提取节点内容""" + # 移除时间戳部分,保留节点名称和消息 + # 格式: [HH:MM:SS] [NodeName] message + match = re.search(r'\[\d{2}:\d{2}:\d{2}\]\s*(.+)', line) + if match: + return match.group(1).strip() + return line.strip() + + def get_file_size(self, file_path: Path) -> int: + """获取文件大小""" + try: + return file_path.stat().st_size if file_path.exists() else 0 + except: + return 0 + + def get_file_line_count(self, file_path: Path) -> int: + """获取文件行数""" + try: + if not file_path.exists(): + return 0 + with open(file_path, 'r', encoding='utf-8') as f: + return sum(1 for _ in f) + except: + return 0 + + # 移除这个方法,逻辑已经合并到monitor_logs中 + + def read_new_lines(self, file_path: Path, app_name: str) -> List[str]: + """读取文件中的新行""" + new_lines = [] + + try: + if not file_path.exists(): + return new_lines + + current_size = self.get_file_size(file_path) + last_position = self.file_positions.get(app_name, 0) + + # 如果文件变小了,说明被清空了,重新从头开始 + if current_size < last_position: + last_position = 0 + + if current_size > last_position: + with open(file_path, 'r', encoding='utf-8') as f: + f.seek(last_position) + new_content = f.read() + new_lines = new_content.split('\n') + + # 更新位置 + self.file_positions[app_name] = f.tell() + + # 过滤空行 + new_lines = [line.strip() for line in new_lines if line.strip()] + + except Exception as e: + print(f"ForumEgine: 读取{app_name}日志失败: {e}") + + return new_lines + + def monitor_logs(self): + """智能监控日志文件""" + print("ForumEgine: 开始智能监控日志文件...") + + # 初始化文件行数和位置 - 记录当前状态作为基线 + for app_name, log_file in self.monitored_logs.items(): + self.file_line_counts[app_name] = self.get_file_line_count(log_file) + self.file_positions[app_name] = self.get_file_size(log_file) + print(f"ForumEgine: {app_name} 基线行数: {self.file_line_counts[app_name]}") + + while self.is_monitoring: + try: + # 同时检测三个log文件的变化 + any_growth = False + any_shrink = False + captured_any = False + + # 为每个log文件独立处理 + for app_name, log_file in self.monitored_logs.items(): + current_lines = self.get_file_line_count(log_file) + previous_lines = self.file_line_counts.get(app_name, 0) + + if current_lines > previous_lines: + any_growth = True + # 立即读取新增内容 + new_lines = self.read_new_lines(log_file, app_name) + + # 先检查是否需要触发搜索(只触发一次) + if not self.is_searching: + for line in new_lines: + if line.strip() and 'FirstSummaryNode' in line: + print(f"ForumEgine: 在{app_name}中检测到FirstSummaryNode,开始监控记录") + self.is_searching = True + self.search_inactive_count = 0 + # 清空forum.log开始新会话 + self.clear_forum_log() + break # 找到一个就够了,跳出循环 + + # 处理所有新增内容(如果正在搜索状态) + if self.is_searching: + for line in new_lines: + if line.strip() and self.is_target_log_line(line): + # 立即记录目标节点输出 + formatted_content = f"[{app_name.upper()}] {line.strip()}" + self.write_to_forum_log(formatted_content) + print(f"ForumEgine: 捕获 - {formatted_content}") + captured_any = True + + elif current_lines < previous_lines: + any_shrink = True + print(f"ForumEgine: 检测到 {app_name} 日志缩短,将重置基线") + # 重置文件位置到新的文件末尾 + self.file_positions[app_name] = self.get_file_size(log_file) + + # 更新行数记录 + self.file_line_counts[app_name] = current_lines + + # 检查是否应该结束当前搜索会话 + if self.is_searching: + if any_shrink: + # log变短,结束当前搜索会话,重置为等待状态 + print("ForumEgine: 日志缩短,结束当前搜索会话,回到等待状态") + self.is_searching = False + self.search_inactive_count = 0 + # 写入结束标记 + end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.write_to_forum_log(f"=== ForumEgine 搜索会话结束 - {end_time} ===") + print("ForumEgine: 已重置基线,等待下次FirstSummaryNode触发") + elif not any_growth and not captured_any: + # 没有增长也没有捕获内容,增加非活跃计数 + self.search_inactive_count += 1 + if self.search_inactive_count >= 30: # 30秒无活动才结束 + print("ForumEgine: 长时间无活动,结束搜索会话") + self.is_searching = False + self.search_inactive_count = 0 + # 写入结束标记 + end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.write_to_forum_log(f"=== ForumEgine 搜索会话超时结束 - {end_time} ===") + else: + self.search_inactive_count = 0 # 重置计数器 + + # 短暂休眠 + time.sleep(1) + + except Exception as e: + print(f"ForumEgine: 监控过程中出错: {e}") + import traceback + traceback.print_exc() + time.sleep(2) + + print("ForumEgine: 停止监控日志文件") + + def start_monitoring(self): + """开始智能监控""" + if self.is_monitoring: + print("ForumEgine: 监控已经在运行中") + return False + + try: + # 启动监控 + self.is_monitoring = True + self.monitor_thread = threading.Thread(target=self.monitor_logs, daemon=True) + self.monitor_thread.start() + + print("ForumEgine: 智能监控已启动") + return True + + except Exception as e: + print(f"ForumEgine: 启动监控失败: {e}") + self.is_monitoring = False + return False + + def stop_monitoring(self): + """停止监控""" + if not self.is_monitoring: + print("ForumEgine: 监控未运行") + return + + try: + self.is_monitoring = False + + if self.monitor_thread and self.monitor_thread.is_alive(): + self.monitor_thread.join(timeout=2) + + # 写入结束标记 + end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.write_to_forum_log(f"=== ForumEgine 监控结束 - {end_time} ===") + + print("ForumEgine: 监控已停止") + + except Exception as e: + print(f"ForumEgine: 停止监控失败: {e}") + + def get_forum_log_content(self) -> List[str]: + """获取forum.log的内容""" + try: + if not self.forum_log_file.exists(): + return [] + + with open(self.forum_log_file, 'r', encoding='utf-8') as f: + return [line.rstrip('\n\r') for line in f.readlines()] + + except Exception as e: + print(f"ForumEgine: 读取forum.log失败: {e}") + return [] + + +# 全局监控器实例 +_monitor_instance = None + +def get_monitor() -> LogMonitor: + """获取全局监控器实例""" + global _monitor_instance + if _monitor_instance is None: + _monitor_instance = LogMonitor() + return _monitor_instance + +def start_forum_monitoring(): + """启动ForumEgine智能监控""" + return get_monitor().start_monitoring() + +def stop_forum_monitoring(): + """停止ForumEgine监控""" + get_monitor().stop_monitoring() + +def get_forum_log(): + """获取forum.log内容""" + return get_monitor().get_forum_log_content() diff --git a/InsightEngine/nodes/report_structure_node.py b/InsightEngine/nodes/report_structure_node.py index 87632da..9c898d2 100644 --- a/InsightEngine/nodes/report_structure_node.py +++ b/InsightEngine/nodes/report_structure_node.py @@ -79,7 +79,7 @@ class ReportStructureNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/InsightEngine/nodes/search_node.py b/InsightEngine/nodes/search_node.py index 52cc17d..2adc114 100644 --- a/InsightEngine/nodes/search_node.py +++ b/InsightEngine/nodes/search_node.py @@ -93,7 +93,7 @@ class FirstSearchNode(BaseNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: @@ -228,7 +228,7 @@ class ReflectionNode(BaseNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/InsightEngine/nodes/summary_node.py b/InsightEngine/nodes/summary_node.py index e44fb1e..9a58213 100644 --- a/InsightEngine/nodes/summary_node.py +++ b/InsightEngine/nodes/summary_node.py @@ -97,7 +97,7 @@ class FirstSummaryNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: @@ -243,7 +243,7 @@ class ReflectionSummaryNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/MediaEngine/nodes/report_structure_node.py b/MediaEngine/nodes/report_structure_node.py index 87632da..9c898d2 100644 --- a/MediaEngine/nodes/report_structure_node.py +++ b/MediaEngine/nodes/report_structure_node.py @@ -79,7 +79,7 @@ class ReportStructureNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/MediaEngine/nodes/search_node.py b/MediaEngine/nodes/search_node.py index 52cc17d..2adc114 100644 --- a/MediaEngine/nodes/search_node.py +++ b/MediaEngine/nodes/search_node.py @@ -93,7 +93,7 @@ class FirstSearchNode(BaseNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: @@ -228,7 +228,7 @@ class ReflectionNode(BaseNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/MediaEngine/nodes/summary_node.py b/MediaEngine/nodes/summary_node.py index e44fb1e..9a58213 100644 --- a/MediaEngine/nodes/summary_node.py +++ b/MediaEngine/nodes/summary_node.py @@ -97,7 +97,7 @@ class FirstSummaryNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: @@ -243,7 +243,7 @@ class ReflectionSummaryNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/QueryEngine/nodes/report_structure_node.py b/QueryEngine/nodes/report_structure_node.py index 87632da..9c898d2 100644 --- a/QueryEngine/nodes/report_structure_node.py +++ b/QueryEngine/nodes/report_structure_node.py @@ -79,7 +79,7 @@ class ReportStructureNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/QueryEngine/nodes/search_node.py b/QueryEngine/nodes/search_node.py index 52cc17d..2adc114 100644 --- a/QueryEngine/nodes/search_node.py +++ b/QueryEngine/nodes/search_node.py @@ -93,7 +93,7 @@ class FirstSearchNode(BaseNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: @@ -228,7 +228,7 @@ class ReflectionNode(BaseNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/QueryEngine/nodes/summary_node.py b/QueryEngine/nodes/summary_node.py index e44fb1e..9a58213 100644 --- a/QueryEngine/nodes/summary_node.py +++ b/QueryEngine/nodes/summary_node.py @@ -97,7 +97,7 @@ class FirstSummaryNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: @@ -243,7 +243,7 @@ class ReflectionSummaryNode(StateMutationNode): cleaned_output = clean_json_tags(cleaned_output) # 记录清理后的输出用于调试 - self.log_info(f"清理后的输出: {cleaned_output[:200]}...") + self.log_info(f"清理后的输出: {cleaned_output}") # 解析JSON try: diff --git a/app.py b/app.py index b16e14e..c068553 100644 --- a/app.py +++ b/app.py @@ -30,6 +30,39 @@ os.environ['PYTHONUTF8'] = '1' LOG_DIR = Path('logs') LOG_DIR.mkdir(exist_ok=True) +# 初始化ForumEgine的forum.log文件 +def init_forum_log(): + """初始化forum.log文件""" + try: + forum_log_file = LOG_DIR / "forum.log" + if not forum_log_file.exists(): + with open(forum_log_file, 'w', encoding='utf-8') as f: + start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + f.write(f"=== ForumEgine 系统初始化 - {start_time} ===\n") + print(f"ForumEgine: forum.log 已初始化") + except Exception as e: + print(f"ForumEgine: 初始化forum.log失败: {e}") + +# 初始化forum.log +init_forum_log() + +# 启动ForumEgine智能监控 +def start_forum_engine(): + """启动ForumEgine智能监控""" + try: + from ForumEgine.monitor import start_forum_monitoring + print("ForumEgine: 启动智能监控...") + success = start_forum_monitoring() + if success: + print("ForumEgine: 智能监控已启动,将自动检测搜索活动") + else: + print("ForumEgine: 智能监控启动失败") + except Exception as e: + print(f"ForumEgine: 启动智能监控失败: {e}") + +# 启动ForumEgine +start_forum_engine() + # 全局变量存储进程信息 processes = { 'insight': {'process': None, 'port': 8501, 'status': 'stopped', 'output': [], 'log_file': None}, @@ -382,6 +415,43 @@ def test_log(app_name): 'message': f'测试消息已写入 {app_name} 日志' }) +@app.route('/api/forum/start') +def start_forum_monitoring_api(): + """手动启动ForumEgine监控""" + try: + from ForumEgine.monitor import start_forum_monitoring + success = start_forum_monitoring() + if success: + return jsonify({'success': True, 'message': 'ForumEgine监控已启动'}) + else: + return jsonify({'success': False, 'message': 'ForumEgine监控启动失败'}) + except Exception as e: + return jsonify({'success': False, 'message': f'启动监控失败: {str(e)}'}) + +@app.route('/api/forum/stop') +def stop_forum_monitoring_api(): + """手动停止ForumEgine监控""" + try: + from ForumEgine.monitor import stop_forum_monitoring + stop_forum_monitoring() + return jsonify({'success': True, 'message': 'ForumEgine监控已停止'}) + except Exception as e: + return jsonify({'success': False, 'message': f'停止监控失败: {str(e)}'}) + +@app.route('/api/forum/log') +def get_forum_log(): + """获取ForumEgine的forum.log内容""" + try: + from ForumEgine.monitor import get_forum_log + log_content = get_forum_log() + return jsonify({ + 'success': True, + 'log_lines': log_content, + 'total_lines': len(log_content) + }) + except Exception as e: + return jsonify({'success': False, 'message': f'读取forum.log失败: {str(e)}'}) + @app.route('/api/search', methods=['POST']) def search(): """统一搜索接口""" @@ -391,6 +461,9 @@ def search(): if not query: return jsonify({'success': False, 'message': '搜索查询不能为空'}) + # ForumEgine智能监控已经在后台运行,会自动检测搜索活动 + print("ForumEgine: 搜索请求已收到,智能监控将自动检测日志变化") + # 检查哪些应用正在运行 check_app_status() running_apps = [name for name, info in processes.items() if info['status'] == 'running'] @@ -418,6 +491,9 @@ def search(): except Exception as e: results[app_name] = {'success': False, 'message': str(e)} + # 搜索完成后可以选择停止监控,或者让它继续运行以捕获后续的处理日志 + # 这里我们让监控继续运行,用户可以通过其他接口手动停止 + return jsonify({ 'success': True, 'query': query,