From 826e5e7e06d9304c8989139e8c1060c795fb8286 Mon Sep 17 00:00:00 2001 From: dragons96 <521274311@qq.com> Date: Fri, 7 Nov 2025 13:31:50 +0800 Subject: [PATCH 1/5] fix: Mysql query error. --- InsightEngine/tools/search.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/InsightEngine/tools/search.py b/InsightEngine/tools/search.py index eb15b24..f7f84dc 100644 --- a/InsightEngine/tools/search.py +++ b/InsightEngine/tools/search.py @@ -31,6 +31,7 @@ from typing import List, Dict, Any, Optional, Literal from dataclasses import dataclass, field from ..utils.db import fetch_all from datetime import datetime, timedelta, date +from InsightEngine.utils.config import settings # --- 1. 数据结构定义 --- @@ -183,6 +184,12 @@ class MediaCrawlerDB: formatted_results = [QueryResult(platform=r['p'], content_type=r['t'], title_or_content=r['title'], author_nickname=r.get('author'), url=r['url'], publish_time=self._to_datetime(r['ts']), engagement=self._extract_engagement(r), hotness_score=r.get('hotness_score', 0.0), source_keyword=r.get('source_keyword'), source_table=r['tbl']) for r in raw_results] return DBResponse("search_hot_content", params_for_log, results=formatted_results, results_count=len(formatted_results)) + def _wrap_query_field_with_dialect(self, field: str) -> str: + """根据数据库方言包装SQL查询""" + if settings.DB_DIALECT == 'postgresql': + return f'"{field}"' + return f'`{field}`' + def search_topic_globally(self, topic: str, limit_per_table: int = 100) -> DBResponse: """ 【工具】全局话题搜索: 在数据库中(内容、评论、标签、来源关键字)全面搜索指定话题。 @@ -205,11 +212,11 @@ class MediaCrawlerDB: where_clauses = [] for idx, field in enumerate(config['fields']): pname = f"term_{idx}" - where_clauses.append(f'"{field}" LIKE :{pname}') + where_clauses.append(f'{self._wrap_query_field_with_dialect(field)} LIKE :{pname}') param_dict[pname] = search_term param_dict['limit'] = limit_per_table where_clause = " OR ".join(where_clauses) - query = f'SELECT * FROM "{table}" WHERE {where_clause} ORDER BY id DESC LIMIT :limit' + query = f'SELECT * FROM {self._wrap_query_field_with_dialect(table)} WHERE {where_clause} ORDER BY id DESC LIMIT :limit' raw_results = self._execute_query(query, param_dict) for row in raw_results: content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', '')) @@ -260,11 +267,11 @@ class MediaCrawlerDB: where_clauses = [] for idx, field in enumerate(config['fields']): pname = f"term_{idx}" - where_clauses.append(f'"{field}" LIKE :{pname}') + where_clauses.append(f'{self._wrap_query_field_with_dialect(field)} LIKE :{pname}') param_dict[pname] = search_term param_dict['limit'] = limit_per_table where_clause = ' OR '.join(where_clauses) - query = f'SELECT * FROM "{table}" WHERE {where_clause} ORDER BY id DESC LIMIT :limit' + query = f'SELECT * FROM {self._wrap_query_field_with_dialect(table)} WHERE {where_clause} ORDER BY id DESC LIMIT :limit' raw_results = self._execute_query(query, param_dict) for row in raw_results: content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', '')) From 8b72ac0fb1bf970ca06ebdeb6f5805ac485202ca Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Fri, 7 Nov 2025 14:24:19 +0800 Subject: [PATCH 2/5] Update README. --- README.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a32d46b..4e2b915 100644 --- a/README.md +++ b/README.md @@ -66,9 +66,12 @@ LLM模型API赞助: 所罗门博客LionCC.ai;编程拼车codecodex.ai;编程算力VibeCodingAPI.ai:666ghj%2FBettaFish | Trendshift -1. 所罗门博客LionCC.ai已更新《BettaFish 微舆系统 - LionCC API 部署配置完全指南》正在二开优化一键部署和云服务器调用方案。 -2. VibeCodingapi.ai狮子算力平台已经适配《BettaFish 微舆系统》所有LLM模型含claude code和openai codex和gemini cli编程开发三巨头算力。额度价格,只要一比一(100元等于100美刀额度) -3. Codecodex.ai狮子编程拼车系统,已实现无IP门槛绕过claude code和openai codex封锁,按官方部署教程后切换BASE_URL调用地址和Token key调用密钥即可使用最强编程模型。 + +1.所罗门博客LionCC.ai已更新《BettaFish 微舆系统 - LionCC API 部署配置完全指南》正在二开优化一键部署和云服务器调用方案。 + +2.VibeCodingapi.ai狮子算力平台已经适配《BettaFish 微舆系统》所有LLM模型含claude code和openai codex和gemini cli编程开发三巨头算力。额度价格,只要一比一(100元等于100美刀额度) + +3.Codecodex.ai狮子编程拼车系统,已实现无IP门槛绕过claude code和openai codex封锁,按官方部署教程后切换BASE_URL调用地址和Token key调用密钥即可使用最强编程模型。 所罗门LionCC赞助BettaFish 微舆福利:打开codecodex.ai狮子编程频道扫码加入微信社群,注册VibeCodingapi.ai狮子算力,统一送20刀API额度(仅限前一千名) From 8f8bf70757611439d5950cb5d601d2399184fb29 Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Fri, 7 Nov 2025 14:25:13 +0800 Subject: [PATCH 3/5] Update README. --- README-EN.md | 1 + README.md | 8 +++----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/README-EN.md b/README-EN.md index 06ea8dd..ba6fb20 100644 --- a/README-EN.md +++ b/README-EN.md @@ -65,6 +65,7 @@ LLM Model API Sponsor:
Solomon Blog LionCC.ai; Programming Carpool codecodex.ai; Programming Computing Power VibeCodingAPI.ai: 666ghj%2FBettaFish | Trendshift + 1. Solomon Blog LionCC.ai has updated the "BettaFish WeiYu System - LionCC API Deployment Configuration Complete Guide" and is optimizing one-click deployment and cloud server invocation solutions. 2. VibeCodingapi.ai Lion Computing Platform has adapted to all LLM models of "BettaFish WeiYu System", including Claude Code, OpenAI Codex, and Gemini CLI programming development computing power. The quota price is 1:1 (100 yuan equals 100 USD quota). 3. Codecodex.ai Lion Programming Carpool System has achieved IP-free access to bypass Claude Code and OpenAI Codex restrictions. After following the official deployment tutorial, simply switch the BASE_URL invocation address and Token key invocation key to use the most powerful programming models. diff --git a/README.md b/README.md index 4e2b915..8a2ee3e 100644 --- a/README.md +++ b/README.md @@ -67,11 +67,9 @@ LLM模型API赞助: 所罗门博客LionCC.ai;编程拼车codecodex.ai;编程算力VibeCodingAPI.ai:666ghj%2FBettaFish | Trendshift -1.所罗门博客LionCC.ai已更新《BettaFish 微舆系统 - LionCC API 部署配置完全指南》正在二开优化一键部署和云服务器调用方案。 - -2.VibeCodingapi.ai狮子算力平台已经适配《BettaFish 微舆系统》所有LLM模型含claude code和openai codex和gemini cli编程开发三巨头算力。额度价格,只要一比一(100元等于100美刀额度) - -3.Codecodex.ai狮子编程拼车系统,已实现无IP门槛绕过claude code和openai codex封锁,按官方部署教程后切换BASE_URL调用地址和Token key调用密钥即可使用最强编程模型。 +1. 所罗门博客LionCC.ai已更新《BettaFish 微舆系统 - LionCC API 部署配置完全指南》正在二开优化一键部署和云服务器调用方案。 +2. VibeCodingapi.ai狮子算力平台已经适配《BettaFish 微舆系统》所有LLM模型含claude code和openai codex和gemini cli编程开发三巨头算力。额度价格,只要一比一(100元等于100美刀额度) +3. Codecodex.ai狮子编程拼车系统,已实现无IP门槛绕过claude code和openai codex封锁,按官方部署教程后切换BASE_URL调用地址和Token key调用密钥即可使用最强编程模型。 所罗门LionCC赞助BettaFish 微舆福利:打开codecodex.ai狮子编程频道扫码加入微信社群,注册VibeCodingapi.ai狮子算力,统一送20刀API额度(仅限前一千名)
From bdc5ffeea62c9947270bc42caaffb332b86ce469 Mon Sep 17 00:00:00 2001 From: 666ghj <670939375@qq.com> Date: Fri, 7 Nov 2025 14:42:34 +0800 Subject: [PATCH 4/5] Update README. --- .env.example | 2 ++ config.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/.env.example b/.env.example index c828cd4..d0962b5 100644 --- a/.env.example +++ b/.env.example @@ -21,6 +21,8 @@ DB_CHARSET=utf8mb4 DB_DIALECT=postgresql # ======================= LLM 相关 ======================= +# 我们的LLM模型API赞助商有:https://share.302.ai/P66Qe3、https://aihubmix.com/?aff=8Ds9,提供了非常全面的模型api + # Insight Agent(推荐Kimi,https://platform.moonshot.cn/)API密钥,用于主LLM INSIGHT_ENGINE_API_KEY= # Insight Agent LLM接口BaseUrl,可自定义厂商API diff --git a/config.py b/config.py index 85a732f..6745a19 100644 --- a/config.py +++ b/config.py @@ -39,6 +39,8 @@ class Settings(BaseSettings): DB_CHARSET: str = Field("utf8mb4", description="数据库字符集,推荐utf8mb4,兼容emoji") # ======================= LLM 相关 ======================= + # 我们的LLM模型API赞助商有:https://share.302.ai/P66Qe3、https://aihubmix.com/?aff=8Ds9,提供了非常全面的模型api + # Insight Agent(推荐Kimi,申请地址:https://platform.moonshot.cn/) INSIGHT_ENGINE_API_KEY: Optional[str] = Field(None, description="Insight Agent(推荐Kimi,https://platform.moonshot.cn/)API密钥,用于主LLM。您可以更改每个部分LLM使用的API,🚩只要兼容OpenAI请求格式都可以,定义好KEY、BASE_URL与MODEL_NAME即可正常使用。重要提醒:我们强烈推荐您先使用推荐的配置申请API,先跑通再进行您的更改!") INSIGHT_ENGINE_BASE_URL: Optional[str] = Field("https://api.moonshot.cn/v1", description="Insight Agent LLM接口BaseUrl,可自定义厂商API") From 474c7657fdbc6476309fd80a53189b340e83c0f3 Mon Sep 17 00:00:00 2001 From: Doiiars Date: Fri, 7 Nov 2025 15:18:20 +0800 Subject: [PATCH 5/5] =?UTF-8?q?1.=20LLM=E6=8E=A5=E5=8F=A3=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E5=AD=97=E8=8A=82=E7=BA=A7=E6=B5=81=E5=BC=8F=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=EF=BC=8C=E9=98=B2=E6=AD=A2=E8=B6=85=E6=97=B6=E9=94=99?= =?UTF-8?q?=E8=AF=AF=EF=BC=8C=E4=B9=9F=E9=81=BF=E5=85=8Dutf-8=E9=95=BF?= =?UTF-8?q?=E5=AD=97=E8=8A=82=E5=AD=97=E7=AC=A6=E6=8B=BC=E6=8E=A5=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- InsightEngine/llms/base.py | 73 ++++++++++++++++++- InsightEngine/nodes/formatting_node.py | 4 +- InsightEngine/nodes/report_structure_node.py | 4 +- InsightEngine/nodes/search_node.py | 8 +- InsightEngine/nodes/summary_node.py | 8 +- MediaEngine/llms/base.py | 73 ++++++++++++++++++- MediaEngine/nodes/formatting_node.py | 4 +- MediaEngine/nodes/report_structure_node.py | 2 +- MediaEngine/nodes/search_node.py | 4 +- MediaEngine/nodes/summary_node.py | 8 +- QueryEngine/llms/base.py | 73 ++++++++++++++++++- QueryEngine/nodes/formatting_node.py | 4 +- QueryEngine/nodes/report_structure_node.py | 2 +- QueryEngine/nodes/search_node.py | 4 +- QueryEngine/nodes/summary_node.py | 8 +- ReportEngine/llms/base.py | 67 ++++++++++++++++- ReportEngine/nodes/html_generation_node.py | 2 +- ReportEngine/nodes/template_selection_node.py | 2 +- utils/forum_reader.py | 5 +- 19 files changed, 315 insertions(+), 40 deletions(-) diff --git a/InsightEngine/llms/base.py b/InsightEngine/llms/base.py index d41bf58..48a5157 100644 --- a/InsightEngine/llms/base.py +++ b/InsightEngine/llms/base.py @@ -5,7 +5,8 @@ Unified OpenAI-compatible LLM client for the Insight Engine, with retry support. import os import sys from datetime import datetime -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Iterator, Generator +from loguru import logger from openai import OpenAI @@ -82,6 +83,76 @@ class LLMClient: return self.validate_response(response.choices[0].message.content) return "" + @with_retry(LLM_RETRY_CONFIG) + def stream_invoke(self, system_prompt: str, user_prompt: str, **kwargs) -> Generator[str, None, None]: + """ + 流式调用LLM,逐步返回响应内容 + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Yields: + 响应文本块(str) + """ + current_time = datetime.now().strftime("%Y年%m月%d日%H时%M分") + time_prefix = f"今天的实际时间是{current_time}" + if user_prompt: + user_prompt = f"{time_prefix}\n{user_prompt}" + else: + user_prompt = time_prefix + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + allowed_keys = {"temperature", "top_p", "presence_penalty", "frequency_penalty"} + extra_params = {key: value for key, value in kwargs.items() if key in allowed_keys and value is not None} + # 强制使用流式 + extra_params["stream"] = True + + timeout = kwargs.pop("timeout", self.timeout) + + try: + stream = self.client.chat.completions.create( + model=self.model_name, + messages=messages, + timeout=timeout, + **extra_params, + ) + + for chunk in stream: + if chunk.choices and len(chunk.choices) > 0: + delta = chunk.choices[0].delta + if delta and delta.content: + yield delta.content + except Exception as e: + logger.error(f"流式请求失败: {str(e)}") + raise e + + def stream_invoke_to_string(self, system_prompt: str, user_prompt: str, **kwargs) -> str: + """ + 流式调用LLM并安全地拼接为完整字符串(避免UTF-8多字节字符截断) + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Returns: + 完整的响应字符串 + """ + # 以字节形式收集所有块 + byte_chunks = [] + for chunk in self.stream_invoke(system_prompt, user_prompt, **kwargs): + byte_chunks.append(chunk.encode('utf-8')) + + # 拼接所有字节,然后一次性解码 + if byte_chunks: + return b''.join(byte_chunks).decode('utf-8', errors='replace') + return "" + @staticmethod def validate_response(response: Optional[str]) -> str: if response is None: diff --git a/InsightEngine/nodes/formatting_node.py b/InsightEngine/nodes/formatting_node.py index 13af69d..6217b48 100644 --- a/InsightEngine/nodes/formatting_node.py +++ b/InsightEngine/nodes/formatting_node.py @@ -70,8 +70,8 @@ class ReportFormattingNode(BaseNode): logger.info("正在格式化最终报告") - # 调用LLM - response = self.llm_client.invoke( + # 调用LLM(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string( SYSTEM_PROMPT_REPORT_FORMATTING, message, ) diff --git a/InsightEngine/nodes/report_structure_node.py b/InsightEngine/nodes/report_structure_node.py index 8a84891..a5a04db 100644 --- a/InsightEngine/nodes/report_structure_node.py +++ b/InsightEngine/nodes/report_structure_node.py @@ -51,8 +51,8 @@ class ReportStructureNode(StateMutationNode): try: logger.info(f"正在为查询生成报告结构: {self.query}") - # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_REPORT_STRUCTURE, self.query) + # 调用LLM(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_REPORT_STRUCTURE, self.query) # 处理响应 processed_response = self.process_output(response) diff --git a/InsightEngine/nodes/search_node.py b/InsightEngine/nodes/search_node.py index e81efba..3aa0462 100644 --- a/InsightEngine/nodes/search_node.py +++ b/InsightEngine/nodes/search_node.py @@ -65,8 +65,8 @@ class FirstSearchNode(BaseNode): logger.info("正在生成首次搜索查询") - # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_FIRST_SEARCH, message) + # 调用LLM(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_FIRST_SEARCH, message) # 处理响应 processed_response = self.process_output(response) @@ -200,8 +200,8 @@ class ReflectionNode(BaseNode): logger.info("正在进行反思并生成新搜索查询") - # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_REFLECTION, message) + # 调用LLM(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_REFLECTION, message) # 处理响应 processed_response = self.process_output(response) diff --git a/InsightEngine/nodes/summary_node.py b/InsightEngine/nodes/summary_node.py index 72f93e2..69b5bc8 100644 --- a/InsightEngine/nodes/summary_node.py +++ b/InsightEngine/nodes/summary_node.py @@ -99,8 +99,8 @@ class FirstSummaryNode(StateMutationNode): logger.info("正在生成首次段落总结") - # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_FIRST_SUMMARY, message) + # 调用LLM(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_FIRST_SUMMARY, message) # 处理响应 processed_response = self.process_output(response) @@ -264,8 +264,8 @@ class ReflectionSummaryNode(StateMutationNode): logger.info("正在生成反思总结") - # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_REFLECTION_SUMMARY, message) + # 调用LLM(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_REFLECTION_SUMMARY, message) # 处理响应 processed_response = self.process_output(response) diff --git a/MediaEngine/llms/base.py b/MediaEngine/llms/base.py index 5bedebb..4cbb3ca 100644 --- a/MediaEngine/llms/base.py +++ b/MediaEngine/llms/base.py @@ -5,7 +5,8 @@ Unified OpenAI-compatible LLM client for the Media Engine, with retry support. import os import sys from datetime import datetime -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Generator +from loguru import logger from openai import OpenAI @@ -85,6 +86,76 @@ class LLMClient: return self.validate_response(response.choices[0].message.content) return "" + @with_retry(LLM_RETRY_CONFIG) + def stream_invoke(self, system_prompt: str, user_prompt: str, **kwargs) -> Generator[str, None, None]: + """ + 流式调用LLM,逐步返回响应内容 + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Yields: + 响应文本块(str) + """ + current_time = datetime.now().strftime("%Y年%m月%d日%H时%M分") + time_prefix = f"今天的实际时间是{current_time}" + if user_prompt: + user_prompt = f"{time_prefix}\n{user_prompt}" + else: + user_prompt = time_prefix + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + allowed_keys = {"temperature", "top_p", "presence_penalty", "frequency_penalty"} + extra_params = {key: value for key, value in kwargs.items() if key in allowed_keys and value is not None} + # 强制使用流式 + extra_params["stream"] = True + + timeout = kwargs.pop("timeout", self.timeout) + + try: + stream = self.client.chat.completions.create( + model=self.model_name, + messages=messages, + timeout=timeout, + **extra_params, + ) + + for chunk in stream: + if chunk.choices and len(chunk.choices) > 0: + delta = chunk.choices[0].delta + if delta and delta.content: + yield delta.content + except Exception as e: + logger.error(f"流式请求失败: {str(e)}") + raise e + + def stream_invoke_to_string(self, system_prompt: str, user_prompt: str, **kwargs) -> str: + """ + 流式调用LLM并安全地拼接为完整字符串(避免UTF-8多字节字符截断) + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Returns: + 完整的响应字符串 + """ + # 以字节形式收集所有块 + byte_chunks = [] + for chunk in self.stream_invoke(system_prompt, user_prompt, **kwargs): + byte_chunks.append(chunk.encode('utf-8')) + + # 拼接所有字节,然后一次性解码 + if byte_chunks: + return b''.join(byte_chunks).decode('utf-8', errors='replace') + return "" + @staticmethod def validate_response(response: Optional[str]) -> str: if response is None: diff --git a/MediaEngine/nodes/formatting_node.py b/MediaEngine/nodes/formatting_node.py index a0ef03d..c605da3 100644 --- a/MediaEngine/nodes/formatting_node.py +++ b/MediaEngine/nodes/formatting_node.py @@ -68,8 +68,8 @@ class ReportFormattingNode(BaseNode): logger.info("正在格式化最终报告") - # 调用LLM生成Markdown格式 - response = self.llm_client.invoke( + # 调用LLM生成Markdown格式(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string( SYSTEM_PROMPT_REPORT_FORMATTING, message, ) diff --git a/MediaEngine/nodes/report_structure_node.py b/MediaEngine/nodes/report_structure_node.py index c1e7214..353b97e 100644 --- a/MediaEngine/nodes/report_structure_node.py +++ b/MediaEngine/nodes/report_structure_node.py @@ -52,7 +52,7 @@ class ReportStructureNode(StateMutationNode): logger.info(f"正在为查询生成报告结构: {self.query}") # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_REPORT_STRUCTURE, self.query) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_REPORT_STRUCTURE, self.query) # 处理响应 processed_response = self.process_output(response) diff --git a/MediaEngine/nodes/search_node.py b/MediaEngine/nodes/search_node.py index e81efba..e44ee72 100644 --- a/MediaEngine/nodes/search_node.py +++ b/MediaEngine/nodes/search_node.py @@ -66,7 +66,7 @@ class FirstSearchNode(BaseNode): logger.info("正在生成首次搜索查询") # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_FIRST_SEARCH, message) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_FIRST_SEARCH, message) # 处理响应 processed_response = self.process_output(response) @@ -201,7 +201,7 @@ class ReflectionNode(BaseNode): logger.info("正在进行反思并生成新搜索查询") # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_REFLECTION, message) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_REFLECTION, message) # 处理响应 processed_response = self.process_output(response) diff --git a/MediaEngine/nodes/summary_node.py b/MediaEngine/nodes/summary_node.py index 748fbb9..2980a76 100644 --- a/MediaEngine/nodes/summary_node.py +++ b/MediaEngine/nodes/summary_node.py @@ -99,8 +99,8 @@ class FirstSummaryNode(StateMutationNode): logger.info("正在生成首次段落总结") - # 调用LLM生成总结 - response = self.llm_client.invoke( + # 调用LLM生成总结(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string( SYSTEM_PROMPT_FIRST_SUMMARY, message, ) @@ -267,8 +267,8 @@ class ReflectionSummaryNode(StateMutationNode): logger.info("正在生成反思总结") - # 调用LLM生成总结 - response = self.llm_client.invoke( + # 调用LLM生成总结(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string( SYSTEM_PROMPT_REFLECTION_SUMMARY, message, ) diff --git a/QueryEngine/llms/base.py b/QueryEngine/llms/base.py index e75e2b1..399ab65 100644 --- a/QueryEngine/llms/base.py +++ b/QueryEngine/llms/base.py @@ -5,7 +5,8 @@ Unified OpenAI-compatible LLM client for the Query Engine, with retry support. import os import sys from datetime import datetime -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Generator +from loguru import logger from openai import OpenAI @@ -82,6 +83,76 @@ class LLMClient: return self.validate_response(response.choices[0].message.content) return "" + @with_retry(LLM_RETRY_CONFIG) + def stream_invoke(self, system_prompt: str, user_prompt: str, **kwargs) -> Generator[str, None, None]: + """ + 流式调用LLM,逐步返回响应内容 + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Yields: + 响应文本块(str) + """ + current_time = datetime.now().strftime("%Y年%m月%d日%H时%M分") + time_prefix = f"今天的实际时间是{current_time}" + if user_prompt: + user_prompt = f"{time_prefix}\n{user_prompt}" + else: + user_prompt = time_prefix + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + allowed_keys = {"temperature", "top_p", "presence_penalty", "frequency_penalty"} + extra_params = {key: value for key, value in kwargs.items() if key in allowed_keys and value is not None} + # 强制使用流式 + extra_params["stream"] = True + + timeout = kwargs.pop("timeout", self.timeout) + + try: + stream = self.client.chat.completions.create( + model=self.model_name, + messages=messages, + timeout=timeout, + **extra_params, + ) + + for chunk in stream: + if chunk.choices and len(chunk.choices) > 0: + delta = chunk.choices[0].delta + if delta and delta.content: + yield delta.content + except Exception as e: + logger.error(f"流式请求失败: {str(e)}") + raise e + + def stream_invoke_to_string(self, system_prompt: str, user_prompt: str, **kwargs) -> str: + """ + 流式调用LLM并安全地拼接为完整字符串(避免UTF-8多字节字符截断) + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Returns: + 完整的响应字符串 + """ + # 以字节形式收集所有块 + byte_chunks = [] + for chunk in self.stream_invoke(system_prompt, user_prompt, **kwargs): + byte_chunks.append(chunk.encode('utf-8')) + + # 拼接所有字节,然后一次性解码 + if byte_chunks: + return b''.join(byte_chunks).decode('utf-8', errors='replace') + return "" + @staticmethod def validate_response(response: Optional[str]) -> str: if response is None: diff --git a/QueryEngine/nodes/formatting_node.py b/QueryEngine/nodes/formatting_node.py index a522380..9880519 100644 --- a/QueryEngine/nodes/formatting_node.py +++ b/QueryEngine/nodes/formatting_node.py @@ -68,8 +68,8 @@ class ReportFormattingNode(BaseNode): logger.info("正在格式化最终报告") - # 调用LLM生成Markdown格式 - response = self.llm_client.invoke( + # 调用LLM生成Markdown格式(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string( SYSTEM_PROMPT_REPORT_FORMATTING, message, ) diff --git a/QueryEngine/nodes/report_structure_node.py b/QueryEngine/nodes/report_structure_node.py index c1e7214..353b97e 100644 --- a/QueryEngine/nodes/report_structure_node.py +++ b/QueryEngine/nodes/report_structure_node.py @@ -52,7 +52,7 @@ class ReportStructureNode(StateMutationNode): logger.info(f"正在为查询生成报告结构: {self.query}") # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_REPORT_STRUCTURE, self.query) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_REPORT_STRUCTURE, self.query) # 处理响应 processed_response = self.process_output(response) diff --git a/QueryEngine/nodes/search_node.py b/QueryEngine/nodes/search_node.py index e81efba..e44ee72 100644 --- a/QueryEngine/nodes/search_node.py +++ b/QueryEngine/nodes/search_node.py @@ -66,7 +66,7 @@ class FirstSearchNode(BaseNode): logger.info("正在生成首次搜索查询") # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_FIRST_SEARCH, message) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_FIRST_SEARCH, message) # 处理响应 processed_response = self.process_output(response) @@ -201,7 +201,7 @@ class ReflectionNode(BaseNode): logger.info("正在进行反思并生成新搜索查询") # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_REFLECTION, message) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_REFLECTION, message) # 处理响应 processed_response = self.process_output(response) diff --git a/QueryEngine/nodes/summary_node.py b/QueryEngine/nodes/summary_node.py index 217db54..2c5942c 100644 --- a/QueryEngine/nodes/summary_node.py +++ b/QueryEngine/nodes/summary_node.py @@ -99,8 +99,8 @@ class FirstSummaryNode(StateMutationNode): logger.info("正在生成首次段落总结") - # 调用LLM生成总结 - response = self.llm_client.invoke( + # 调用LLM生成总结(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string( SYSTEM_PROMPT_FIRST_SUMMARY, message, ) @@ -267,8 +267,8 @@ class ReflectionSummaryNode(StateMutationNode): logger.info("正在生成反思总结") - # 调用LLM生成总结 - response = self.llm_client.invoke( + # 调用LLM生成总结(流式,安全拼接UTF-8) + response = self.llm_client.stream_invoke_to_string( SYSTEM_PROMPT_REFLECTION_SUMMARY, message, ) diff --git a/ReportEngine/llms/base.py b/ReportEngine/llms/base.py index 7ab9d34..64e34ac 100644 --- a/ReportEngine/llms/base.py +++ b/ReportEngine/llms/base.py @@ -4,7 +4,8 @@ Unified OpenAI-compatible LLM client for the Report Engine, with retry support. import os import sys -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Generator +from loguru import logger from openai import OpenAI @@ -75,6 +76,70 @@ class LLMClient: return self.validate_response(response.choices[0].message.content) return "" + @with_retry(LLM_RETRY_CONFIG) + def stream_invoke(self, system_prompt: str, user_prompt: str, **kwargs) -> Generator[str, None, None]: + """ + 流式调用LLM,逐步返回响应内容 + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Yields: + 响应文本块(str) + """ + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + allowed_keys = {"temperature", "top_p", "presence_penalty", "frequency_penalty"} + extra_params = {key: value for key, value in kwargs.items() if key in allowed_keys and value is not None} + # 强制使用流式 + extra_params["stream"] = True + + timeout = kwargs.pop("timeout", self.timeout) + + try: + stream = self.client.chat.completions.create( + model=self.model_name, + messages=messages, + timeout=timeout, + **extra_params, + ) + + for chunk in stream: + if chunk.choices and len(chunk.choices) > 0: + delta = chunk.choices[0].delta + if delta and delta.content: + yield delta.content + except Exception as e: + logger.error(f"流式请求失败: {str(e)}") + raise e + + def stream_invoke_to_string(self, system_prompt: str, user_prompt: str, **kwargs) -> str: + """ + 流式调用LLM并安全地拼接为完整字符串(避免UTF-8多字节字符截断) + + Args: + system_prompt: 系统提示词 + user_prompt: 用户提示词 + **kwargs: 额外参数(temperature, top_p等) + + Returns: + 完整的响应字符串 + """ + # 以字节形式收集所有块 + byte_chunks = [] + for chunk in self.stream_invoke(system_prompt, user_prompt, **kwargs): + byte_chunks.append(chunk.encode('utf-8')) + + # 拼接所有字节,然后一次性解码 + if byte_chunks: + return b''.join(byte_chunks).decode('utf-8', errors='replace') + return "" + @staticmethod def validate_response(response: Optional[str]) -> str: if response is None: diff --git a/ReportEngine/nodes/html_generation_node.py b/ReportEngine/nodes/html_generation_node.py index 85a6b53..2086528 100644 --- a/ReportEngine/nodes/html_generation_node.py +++ b/ReportEngine/nodes/html_generation_node.py @@ -60,7 +60,7 @@ class HTMLGenerationNode(StateMutationNode): message = json.dumps(llm_input, ensure_ascii=False, indent=2) # 调用LLM生成HTML - response = self.llm_client.invoke(SYSTEM_PROMPT_HTML_GENERATION, message) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_HTML_GENERATION, message) # 处理响应(简化版) processed_response = self.process_output(response) diff --git a/ReportEngine/nodes/template_selection_node.py b/ReportEngine/nodes/template_selection_node.py index d21c96e..832d08e 100644 --- a/ReportEngine/nodes/template_selection_node.py +++ b/ReportEngine/nodes/template_selection_node.py @@ -115,7 +115,7 @@ class TemplateSelectionNode(BaseNode): 请根据查询内容、报告内容和论坛日志的具体情况,选择最合适的模板。""" # 调用LLM - response = self.llm_client.invoke(SYSTEM_PROMPT_TEMPLATE_SELECTION, user_message) + response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_TEMPLATE_SELECTION, user_message) # 检查响应是否为空 if not response or not response.strip(): diff --git a/utils/forum_reader.py b/utils/forum_reader.py index 8943988..93f000e 100644 --- a/utils/forum_reader.py +++ b/utils/forum_reader.py @@ -6,10 +6,7 @@ Forum日志读取工具 import re from pathlib import Path from typing import Optional, List, Dict -import logging - -logger = logging.getLogger(__name__) - +from loguru import logger def get_latest_host_speech(log_dir: str = "logs") -> Optional[str]: """