diff --git a/.env.example b/.env.example index 69859c2..c828cd4 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,9 @@ +# ====================== BETTAFISH 相关 ====================== +# BETTAFISH 主机地址,例如:0.0.0.0 或 127.0.0.1 +HOST=0.0.0.0 +# BETTAFISH 主机地址,默认为5000 +PORT=5000 + # ====================== 数据库配置 ====================== # 数据库主机,例如localhost 或 127.0.0.1 DB_HOST=your_db_host @@ -21,6 +27,7 @@ INSIGHT_ENGINE_API_KEY= INSIGHT_ENGINE_BASE_URL= # Insight Agent LLM模型名称,如kimi-k2-0711-preview INSIGHT_ENGINE_MODEL_NAME= + # Media Agent(推荐Gemini,推荐中转厂商 https://aihubmix.com/?aff=8Ds9)API密钥 MEDIA_ENGINE_API_KEY= # Media Agent LLM接口BaseUrl @@ -28,7 +35,7 @@ MEDIA_ENGINE_BASE_URL= # Media Agent LLM模型名称,如gemini-2.5-pro MEDIA_ENGINE_MODEL_NAME= -# Media Agent API密钥(推荐Deepseek) +# MindSpider Agent API密钥(推荐Deepseek) MINDSPIDER_API_KEY= # MindSpider LLM接口BaseUrl MINDSPIDER_BASE_URL= @@ -41,18 +48,21 @@ QUERY_ENGINE_API_KEY= QUERY_ENGINE_BASE_URL= # Query Agent LLM模型,如deepseek-reasoner QUERY_ENGINE_MODEL_NAME= + # Report Agent(推荐Gemini,推荐中转厂商 https://aihubmix.com/?aff=8Ds9)API密钥 REPORT_ENGINE_API_KEY= # Report Agent LLM接口BaseUrl REPORT_ENGINE_BASE_URL= # Report Agent LLM模型,如gemini-2.5-pro REPORT_ENGINE_MODEL_NAME= + # Forum Host LLM API密钥,Qwen3最新模型,推荐 https://cloud.siliconflow.cn/ FORUM_HOST_API_KEY= # Forum Host LLM BaseUrl FORUM_HOST_BASE_URL= # Forum Host LLM模型名,如Qwen/Qwen3-235B-A22B-Instruct-2507 FORUM_HOST_MODEL_NAME= + # SQL Keyword Optimizer LLM密钥,小参数Qwen3模型 https://cloud.siliconflow.cn/ KEYWORD_OPTIMIZER_API_KEY= # Keyword Optimizer BaseUrl diff --git a/ForumEngine/monitor.py b/ForumEngine/monitor.py index 5dd84cd..7e1df92 100644 --- a/ForumEngine/monitor.py +++ b/ForumEngine/monitor.py @@ -70,6 +70,7 @@ class LogMonitor: self.capturing_json = {} # 每个app的JSON捕获状态 self.json_buffer = {} # 每个app的JSON缓冲区 self.json_start_line = {} # 每个app的JSON开始行 + self.in_error_block = {} # 每个app是否在ERROR块中 # 确保logs目录存在 self.log_dir.mkdir(exist_ok=True) @@ -93,6 +94,7 @@ class LogMonitor: self.capturing_json = {} self.json_buffer = {} self.json_start_line = {} + self.in_error_block = {} # 重置主持人相关状态 self.agent_speeches_buffer = [] @@ -118,6 +120,21 @@ class LogMonitor: except Exception as e: logger.exception(f"ForumEngine: 写入forum.log失败: {e}") + def get_log_level(self, line: str) -> Optional[str]: + """检测日志行的级别(INFO/ERROR/WARNING/DEBUG等) + + 支持loguru格式:YYYY-MM-DD HH:mm:ss.SSS | LEVEL | ... + + Returns: + 'INFO', 'ERROR', 'WARNING', 'DEBUG' 或 None(无法识别) + """ + # 检查loguru格式:YYYY-MM-DD HH:mm:ss.SSS | LEVEL | ... + # 匹配模式:| LEVEL | 或 | LEVEL | + match = re.search(r'\|\s*(INFO|ERROR|WARNING|DEBUG|TRACE|CRITICAL)\s*\|', line) + if match: + return match.group(1) + return None + def is_target_log_line(self, line: str) -> bool: """检查是否是目标日志行(SummaryNode) @@ -132,6 +149,11 @@ class LogMonitor: - 包含错误关键词的日志(JSON解析失败、JSON修复失败等) """ # 排除 ERROR 级别的日志 + log_level = self.get_log_level(line) + if log_level == 'ERROR': + return False + + # 兼容旧检查方式 if "| ERROR" in line or "| ERROR |" in line: return False @@ -381,6 +403,7 @@ class LogMonitor: # 重置JSON捕获状态 self.capturing_json[app_name] = False self.json_buffer[app_name] = [] + self.in_error_block[app_name] = False if current_size > last_position: with open(file_path, 'r', encoding='utf-8') as f: @@ -400,17 +423,47 @@ class LogMonitor: return new_lines def process_lines_for_json(self, lines: List[str], app_name: str) -> List[str]: - """处理行以捕获多行JSON内容""" + """处理行以捕获多行JSON内容 + + 实现ERROR块过滤:如果遇到ERROR级别的日志,拒绝处理直到遇到下一个INFO级别的日志 + """ captured_contents = [] # 初始化状态 if app_name not in self.capturing_json: self.capturing_json[app_name] = False self.json_buffer[app_name] = [] + if app_name not in self.in_error_block: + self.in_error_block[app_name] = False for line in lines: if not line.strip(): continue + + # 首先检查日志级别,更新ERROR块状态 + log_level = self.get_log_level(line) + if log_level == 'ERROR': + # 遇到ERROR,进入ERROR块状态 + self.in_error_block[app_name] = True + # 如果正在捕获JSON,立即停止并清空缓冲区 + if self.capturing_json[app_name]: + self.capturing_json[app_name] = False + self.json_buffer[app_name] = [] + # 跳过当前行,不处理 + continue + elif log_level == 'INFO': + # 遇到INFO,退出ERROR块状态 + self.in_error_block[app_name] = False + # 其他级别(WARNING、DEBUG等)保持当前状态 + + # 如果在ERROR块中,拒绝处理所有内容 + if self.in_error_block[app_name]: + # 如果正在捕获JSON,立即停止并清空缓冲区 + if self.capturing_json[app_name]: + self.capturing_json[app_name] = False + self.json_buffer[app_name] = [] + # 跳过当前行,不处理 + continue # 检查是否是目标节点行和JSON开始标记 is_target = self.is_target_log_line(line) @@ -538,6 +591,7 @@ class LogMonitor: self.file_positions[app_name] = self.get_file_size(log_file) self.capturing_json[app_name] = False self.json_buffer[app_name] = [] + self.in_error_block[app_name] = False # logger.info(f"ForumEngine: {app_name} 基线行数: {self.file_line_counts[app_name]}") while self.is_monitoring: @@ -601,6 +655,7 @@ class LogMonitor: # 重置JSON捕获状态 self.capturing_json[app_name] = False self.json_buffer[app_name] = [] + self.in_error_block[app_name] = False # 更新行数记录 self.file_line_counts[app_name] = current_lines diff --git a/InsightEngine/agent.py b/InsightEngine/agent.py index deb4ff3..1097681 100644 --- a/InsightEngine/agent.py +++ b/InsightEngine/agent.py @@ -779,5 +779,5 @@ def create_agent(config_file: Optional[str] = None) -> DeepSearchAgent: Returns: DeepSearchAgent实例 """ - config = Settings() + config = Settings() # 以空配置初始化,而从从环境变量初始化 return DeepSearchAgent(config) diff --git a/InsightEngine/nodes/report_structure_node.py b/InsightEngine/nodes/report_structure_node.py index 618a22e..8a84891 100644 --- a/InsightEngine/nodes/report_structure_node.py +++ b/InsightEngine/nodes/report_structure_node.py @@ -87,11 +87,11 @@ class ReportStructureNode(StateMutationNode): report_structure = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 report_structure = extract_clean_response(cleaned_output) if "error" in report_structure: - logger.exception("JSON解析失败,尝试修复...") + logger.error("JSON解析失败,尝试修复...") # 尝试修复JSON fixed_json = fix_incomplete_json(cleaned_output) if fixed_json: @@ -99,11 +99,11 @@ class ReportStructureNode(StateMutationNode): report_structure = json.loads(fixed_json) logger.info("JSON修复成功") except JSONDecodeError: - logger.exception("JSON修复失败") + logger.error("JSON修复失败") # 返回默认结构 return self._generate_default_structure() else: - logger.exception("无法修复JSON,使用默认结构") + logger.error("无法修复JSON,使用默认结构") return self._generate_default_structure() # 验证结构 diff --git a/InsightEngine/nodes/search_node.py b/InsightEngine/nodes/search_node.py index fb8f300..e81efba 100644 --- a/InsightEngine/nodes/search_node.py +++ b/InsightEngine/nodes/search_node.py @@ -101,7 +101,7 @@ class FirstSearchNode(BaseNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 result = extract_clean_response(cleaned_output) if "error" in result: @@ -236,7 +236,7 @@ class ReflectionNode(BaseNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 result = extract_clean_response(cleaned_output) if "error" in result: diff --git a/InsightEngine/nodes/summary_node.py b/InsightEngine/nodes/summary_node.py index 3e58830..72f93e2 100644 --- a/InsightEngine/nodes/summary_node.py +++ b/InsightEngine/nodes/summary_node.py @@ -135,7 +135,7 @@ class FirstSummaryNode(StateMutationNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 尝试修复JSON fixed_json = fix_incomplete_json(cleaned_output) if fixed_json: @@ -300,7 +300,7 @@ class ReflectionSummaryNode(StateMutationNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 尝试修复JSON fixed_json = fix_incomplete_json(cleaned_output) if fixed_json: @@ -308,11 +308,11 @@ class ReflectionSummaryNode(StateMutationNode): result = json.loads(fixed_json) logger.info("JSON修复成功") except JSONDecodeError: - logger.info("JSON修复失败,直接使用清理后的文本") + logger.error("JSON修复失败,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output else: - logger.info("无法修复JSON,直接使用清理后的文本") + logger.error("无法修复JSON,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output diff --git a/MediaEngine/nodes/report_structure_node.py b/MediaEngine/nodes/report_structure_node.py index 8ec201c..c1e7214 100644 --- a/MediaEngine/nodes/report_structure_node.py +++ b/MediaEngine/nodes/report_structure_node.py @@ -87,7 +87,7 @@ class ReportStructureNode(StateMutationNode): report_structure = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 report_structure = extract_clean_response(cleaned_output) if "error" in report_structure: diff --git a/MediaEngine/nodes/search_node.py b/MediaEngine/nodes/search_node.py index fb8f300..e81efba 100644 --- a/MediaEngine/nodes/search_node.py +++ b/MediaEngine/nodes/search_node.py @@ -101,7 +101,7 @@ class FirstSearchNode(BaseNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 result = extract_clean_response(cleaned_output) if "error" in result: @@ -236,7 +236,7 @@ class ReflectionNode(BaseNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 result = extract_clean_response(cleaned_output) if "error" in result: diff --git a/MediaEngine/nodes/summary_node.py b/MediaEngine/nodes/summary_node.py index 7e6a27d..748fbb9 100644 --- a/MediaEngine/nodes/summary_node.py +++ b/MediaEngine/nodes/summary_node.py @@ -138,7 +138,7 @@ class FirstSummaryNode(StateMutationNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 尝试修复JSON fixed_json = fix_incomplete_json(cleaned_output) if fixed_json: @@ -306,7 +306,7 @@ class ReflectionSummaryNode(StateMutationNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 尝试修复JSON fixed_json = fix_incomplete_json(cleaned_output) if fixed_json: @@ -314,11 +314,11 @@ class ReflectionSummaryNode(StateMutationNode): result = json.loads(fixed_json) logger.info("JSON修复成功") except JSONDecodeError: - logger.exception("JSON修复失败,直接使用清理后的文本") + logger.error("JSON修复失败,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output else: - logger.exception("无法修复JSON,直接使用清理后的文本") + logger.error("无法修复JSON,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output diff --git a/QueryEngine/nodes/report_structure_node.py b/QueryEngine/nodes/report_structure_node.py index 8ec201c..c1e7214 100644 --- a/QueryEngine/nodes/report_structure_node.py +++ b/QueryEngine/nodes/report_structure_node.py @@ -87,7 +87,7 @@ class ReportStructureNode(StateMutationNode): report_structure = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 report_structure = extract_clean_response(cleaned_output) if "error" in report_structure: diff --git a/QueryEngine/nodes/search_node.py b/QueryEngine/nodes/search_node.py index fb8f300..e81efba 100644 --- a/QueryEngine/nodes/search_node.py +++ b/QueryEngine/nodes/search_node.py @@ -101,7 +101,7 @@ class FirstSearchNode(BaseNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 result = extract_clean_response(cleaned_output) if "error" in result: @@ -236,7 +236,7 @@ class ReflectionNode(BaseNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 使用更强大的提取方法 result = extract_clean_response(cleaned_output) if "error" in result: diff --git a/QueryEngine/nodes/summary_node.py b/QueryEngine/nodes/summary_node.py index 5643667..217db54 100644 --- a/QueryEngine/nodes/summary_node.py +++ b/QueryEngine/nodes/summary_node.py @@ -138,7 +138,7 @@ class FirstSummaryNode(StateMutationNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 尝试修复JSON fixed_json = fix_incomplete_json(cleaned_output) if fixed_json: @@ -146,11 +146,11 @@ class FirstSummaryNode(StateMutationNode): result = json.loads(fixed_json) logger.info("JSON修复成功") except JSONDecodeError: - logger.exception("JSON修复失败,直接使用清理后的文本") + logger.error("JSON修复失败,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output else: - logger.exception("无法修复JSON,直接使用清理后的文本") + logger.error("无法修复JSON,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output @@ -306,7 +306,7 @@ class ReflectionSummaryNode(StateMutationNode): result = json.loads(cleaned_output) logger.info("JSON解析成功") except JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 尝试修复JSON fixed_json = fix_incomplete_json(cleaned_output) if fixed_json: @@ -314,11 +314,11 @@ class ReflectionSummaryNode(StateMutationNode): result = json.loads(fixed_json) logger.info("JSON修复成功") except JSONDecodeError: - logger.exception("JSON修复失败,直接使用清理后的文本") + logger.error("JSON修复失败,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output else: - logger.exception("无法修复JSON,直接使用清理后的文本") + logger.error("无法修复JSON,直接使用清理后的文本") # 如果不是JSON格式,直接返回清理后的文本 return cleaned_output diff --git a/ReportEngine/agent.py b/ReportEngine/agent.py index b32e979..82c67eb 100644 --- a/ReportEngine/agent.py +++ b/ReportEngine/agent.py @@ -351,7 +351,7 @@ class ReportAgent: query_safe = query_safe.replace(' ', '_')[:30] filename = f"final_report_{query_safe}_{timestamp}.html" - filepath = os.path.join(settings.OUTPUT_DIR, filename) + filepath = os.path.join(self.config.OUTPUT_DIR, filename) # 保存HTML报告 with open(filepath, 'w', encoding='utf-8') as f: @@ -361,7 +361,7 @@ class ReportAgent: # 保存状态 state_filename = f"report_state_{query_safe}_{timestamp}.json" - state_filepath = os.path.join(settings.OUTPUT_DIR, state_filename) + state_filepath = os.path.join(self.config.OUTPUT_DIR, state_filename) self.state.save_to_file(state_filepath) logger.info(f"状态已保存到: {state_filepath}") diff --git a/ReportEngine/nodes/template_selection_node.py b/ReportEngine/nodes/template_selection_node.py index 6afbf25..d21c96e 100644 --- a/ReportEngine/nodes/template_selection_node.py +++ b/ReportEngine/nodes/template_selection_node.py @@ -145,7 +145,7 @@ class TemplateSelectionNode(BaseNode): return None except json.JSONDecodeError as e: - logger.exception(f"JSON解析失败: {str(e)}") + logger.error(f"JSON解析失败: {str(e)}") # 尝试从文本响应中提取模板信息 return self._extract_template_from_text(response, available_templates) diff --git a/app.py b/app.py index 8749dcc..9628317 100644 --- a/app.py +++ b/app.py @@ -48,6 +48,8 @@ LOG_DIR.mkdir(exist_ok=True) CONFIG_MODULE_NAME = 'config' CONFIG_FILE_PATH = Path(__file__).resolve().parent / 'config.py' CONFIG_KEYS = [ + 'HOST', + 'PORT', 'DB_DIALECT', 'DB_HOST', 'DB_PORT', @@ -1025,8 +1027,11 @@ def handle_status_request(): }) if __name__ == '__main__': - HOST = '0.0.0.0' - PORT = 5000 + # 从配置文件读取 HOST 和 PORT + from config import settings + HOST = settings.HOST + PORT = settings.PORT + logger.info("等待配置确认,系统将在前端指令后启动组件...") logger.info(f"Flask服务器已启动,访问地址: http://{HOST}:{PORT}") diff --git a/config.py b/config.py index 9e5dde0..85a732f 100644 --- a/config.py +++ b/config.py @@ -25,7 +25,10 @@ class Settings(BaseSettings): 全局配置;支持 .env 和环境变量自动加载。 变量名与原 config.py 大写一致,便于平滑过渡。 """ - + # ================== Flask 服务器配置 ==================== + HOST: str = Field("0.0.0.0", description="Flask服务器主机地址,默认0.0.0.0(允许外部访问)") + PORT: int = Field(5000, description="Flask服务器端口号,默认5000") + # ====================== 数据库配置 ====================== DB_DIALECT: str = Field("mysql", description="数据库类型,例如 'mysql' 或 'postgresql'。用于支持多种数据库后端(如 SQLAlchemy,请与连接信息共同配置)") DB_HOST: str = Field("your_db_host", description="数据库主机,例如localhost 或 127.0.0.1。我们也提供云数据库资源便捷配置,日均10w+数据,可免费申请,联系我们:670939375@qq.com NOTE:为进行数据合规性审查与服务升级,云数据库自2025年10月1日起暂停接收新的使用申请")