import openai import anthropic import json from typing import List, Dict, Tuple, Any import os import asyncio import math from datetime import datetime from utils.logger import app_logger as logging class AIAnalyzer: def __init__(self): # 尝试从环境变量中获取API密钥,如果没有则主动询问配置 self.openai_key = os.getenv('OPENAI_API_KEY') if not self.openai_key: print("未检测到 OPENAI_API_KEY。") # 提示时允许按回车跳过输入 self.openai_key = input("请输入 OPENAI_API_KEY (按回车键跳过输入): ").strip() self.claude_key = os.getenv('ANTHROPIC_API_KEY') if not self.claude_key: print("未检测到 ANTHROPIC_API_KEY。") self.claude_key = input("请输入 ANTHROPIC_API_KEY (按回车键跳过输入): ").strip() self.deepseek_key = os.getenv('DEEPSEEK_API_KEY') if not self.deepseek_key: print("未检测到 DEEPSEEK_API_KEY。") self.deepseek_key = input("请输入 DEEPSEEK_API_KEY (按回车键跳过输入): ").strip() # 如果不希望通过交互输入,也可以直接在此处配置(注释掉下面几行即可) # self.openai_key = "你的OpenAI_API_KEY" # self.claude_key = "你的ANTHROPIC_API_KEY" # self.deepseek_key = "你的DEEPSEEK_API_KEY" # 配置各API客户端 if self.openai_key: openai.api_key = self.openai_key if self.claude_key: self.claude_client = anthropic.Anthropic(api_key=self.claude_key) if self.deepseek_key: self.deepseek_client = openai.OpenAI( api_key=self.deepseek_key, base_url="https://api.deepseek.com/v1" ) # 支持的模型列表(增加了最新的 ChatGPT 和 Claude 模型) self.supported_models: Dict[str, Dict[str, Any]] = { # OpenAI 最新模型(ChatGPT系列) 'gpt-4o-latest': { 'provider': 'openai', 'max_tokens': 128000, # 支持大窗口 'cost_per_1k': 0.01 # 参考价格(美元) }, 'gpt-4o-mini': { 'provider': 'openai', 'max_tokens': 4000, # 轻量版,适合快速任务 'cost_per_1k': 0.00015 # 成本大幅降低 }, # 旧版OpenAI模型 'gpt-3.5-turbo': {'provider': 'openai', 'max_tokens': 2000, 'cost_per_1k': 0.0015}, 'gpt-3.5-turbo-16k': {'provider': 'openai', 'max_tokens': 16000, 'cost_per_1k': 0.003}, 'gpt-4': {'provider': 'openai', 'max_tokens': 8000, 'cost_per_1k': 0.03}, 'gpt-4-32k': {'provider': 'openai', 'max_tokens': 32000, 'cost_per_1k': 0.06}, 'gpt-4-turbo-preview': {'provider': 'openai', 'max_tokens': 128000, 'cost_per_1k': 0.01}, # Anthropic 最新模型(Claude系列) 'claude-3.5-sonnet-new': { 'provider': 'anthropic', 'max_tokens': 200000, # 新版Claude 3.5 Sonnet 'cost_per_1k': 0.015 }, 'claude-3.5-haiku': { 'provider': 'anthropic', 'max_tokens': 200000, # 最新Claude 3.5 Haiku 'cost_per_1k': 0.0025 }, # 旧版Claude模型 'claude-2.1': {'provider': 'anthropic', 'max_tokens': 100000, 'cost_per_1k': 0.008}, 'claude-2.0': {'provider': 'anthropic', 'max_tokens': 100000, 'cost_per_1k': 0.008}, 'claude-instant-1.2': {'provider': 'anthropic', 'max_tokens': 100000, 'cost_per_1k': 0.0015}, # DeepSeek 模型 'deepseek-chat': {'provider': 'deepseek', 'max_tokens': 4000, 'cost_per_1k': 0.002}, 'deepseek-reasoner': {'provider': 'deepseek', 'max_tokens': 4000, 'cost_per_1k': 0.003} } # 不同深度的分析提示词 self.prompt_templates: Dict[str, str] = { 'basic': """你是一个专业的舆情分析助手。请对每条消息进行基础的情感分析。 请按以下JSON格式返回: { "analysis_results": [ { "message_id": "消息ID", "sentiment": "情感倾向 (积极/消极/中性)", "sentiment_score": "情感分数 (0-1)", "keywords": ["关键词1", "关键词2"], "key_points": "简要概述", "influence_analysis": "基础影响分析", "risk_level": "风险等级 (低/中/高)", "timestamp": "分析时间戳" } ] }""", 'standard': """你是一个专业的舆情分析助手。请对每条消息进行标准深度的分析。 请按以下JSON格式返回: { "analysis_results": [ { "message_id": "消息ID", "sentiment": "情感倾向 (积极/消极/中性)", "sentiment_score": "情感分数 (0-1)", "keywords": ["关键词1", "关键词2", "关键词3"], "key_points": "核心观点概述", "influence_analysis": "潜在影响分析", "risk_level": "风险等级 (低/中/高)", "timestamp": "分析时间戳" } ] }""", 'deep': """你是一个专业的舆情分析助手。请对每条消息进行深度分析。 请按以下JSON格式返回: { "analysis_results": [ { "message_id": "消息ID", "sentiment": "情感倾向 (积极/消极/中性)", "sentiment_score": "情感分数 (0-1)", "keywords": ["关键词1", "关键词2", "关键词3", "关键词4", "关键词5"], "key_points": "详细的核心观点分析", "influence_analysis": "深度影响分析,包括短期和长期影响", "risk_factors": ["风险因素1", "风险因素2", "风险因素3"], "risk_level": "风险等级 (低/中/高)", "suggestions": ["建议1", "建议2", "建议3"], "timestamp": "分析时间戳" } ] }""" } async def analyze_messages(self, messages: List[Dict], batch_size: int = 50, model_type: str = "gpt-3.5-turbo", analysis_depth: str = "standard", prefer_deepseek: bool = True) -> List[Dict]: """ 分析一批消息并返回分析结果。 如果 DeepSeek API 可用且 prefer_deepseek 为 True,则优先使用 DeepSeek 模型。 """ try: # 优先使用 DeepSeek 模型以降低成本 if prefer_deepseek and self.deepseek_key: if model_type not in ['deepseek-chat', 'deepseek-reasoner']: logging.info("检测到 DeepSeek API, 优先使用 'deepseek-chat' 模型以降低成本。") model_type = 'deepseek-chat' if model_type not in self.supported_models: raise ValueError(f"不支持的模型类型: {model_type}") model_info = self.supported_models[model_type] provider = model_info['provider'] max_tokens = model_info['max_tokens'] # 根据模型类型调整批处理大小 optimal_batch_size = self._get_optimal_batch_size(model_type) adjusted_batch_size = min(batch_size, optimal_batch_size) if adjusted_batch_size != batch_size: logging.info(f"已将批处理大小从 {batch_size} 调整为 {adjusted_batch_size}") tasks = [] total_cost = 0.0 # 分批处理消息并异步调用分析任务 for i in range(0, len(messages), adjusted_batch_size): batch = messages[i:i + adjusted_batch_size] system_prompt = self.prompt_templates.get(analysis_depth, self.prompt_templates['standard']) tasks.append(self._process_batch(batch, system_prompt, model_type, max_tokens, provider)) # 并发执行所有批次任务 results = await asyncio.gather(*tasks) all_results = [] for batch_result, batch_cost in results: all_results.extend(batch_result) total_cost += batch_cost logging.info(f"分析完成, 总成本: ${total_cost:.4f}") return all_results except Exception as e: logging.error(f"AI分析过程出错: {e}", exc_info=True) return [] async def _process_batch(self, batch: List[Dict], system_prompt: str, model_type: str, max_tokens: int, provider: str) -> Tuple[List[Dict], float]: """ 处理单个批次的消息,返回 (分析结果, 本批次成本) """ try: formatted_messages = [ f"消息ID: {msg.get('id')}\n内容: {msg.get('content')}" for msg in batch ] messages_text = "\n---\n".join(formatted_messages) if provider == 'openai': result = await self._analyze_with_openai(messages_text, system_prompt, model_type, max_tokens) elif provider == 'anthropic': result = await self._analyze_with_claude(messages_text, system_prompt, model_type, max_tokens) elif provider == 'deepseek': result = await self._analyze_with_deepseek(messages_text, system_prompt, model_type, max_tokens) else: logging.error(f"未知的API供应商: {provider}") return ([], 0.0) batch_cost = self._calculate_cost(len(messages_text), model_type) logging.info(f"批次处理完成, 成本: ${batch_cost:.4f}") return (result, batch_cost) except Exception as e: logging.error(f"处理批次时出错: {e}", exc_info=True) return ([], 0.0) def _get_optimal_batch_size(self, model_type: str) -> int: """根据模型类型获取最优批处理大小""" model_info = self.supported_models[model_type] max_tokens = model_info['max_tokens'] # 估算每条消息的平均 token 数(假设为 200) avg_tokens_per_message = 200 # 预留 20% 的 token 用于系统提示词和响应 available_tokens = int(max_tokens * 0.8) optimal_batch_size = max(1, min(100, available_tokens // avg_tokens_per_message)) return optimal_batch_size def _calculate_cost(self, input_length: int, model_type: str) -> float: """计算 API 调用成本""" model_info = self.supported_models[model_type] cost_per_1k = model_info['cost_per_1k'] # 估算 token 数(假设每 4 个字符约等于 1 个 token) estimated_tokens = math.ceil(input_length / 4) cost = (estimated_tokens / 1000) * cost_per_1k return cost async def _analyze_with_openai(self, messages_text: str, system_prompt: str, model: str, max_tokens: int) -> List[Dict]: """使用 OpenAI API 进行分析""" try: response = await openai.ChatCompletion.acreate( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"请分析以下消息:\n{messages_text}"} ], temperature=0.3, max_tokens=max_tokens, n=1 ) content = response.choices[0].message.content result = json.loads(content) if isinstance(result, dict) and 'analysis_results' in result: return result['analysis_results'] else: logging.error(f"OpenAI API返回格式不正确: {content}") return [] except Exception as e: logging.error(f"OpenAI API调用失败: {e}", exc_info=True) return [] async def _analyze_with_claude(self, messages_text: str, system_prompt: str, model: str, max_tokens: int) -> List[Dict]: """使用 Claude API 进行分析""" try: response = await self.claude_client.messages.create( model=model, max_tokens=max_tokens, temperature=0.3, system=system_prompt, messages=[{"role": "user", "content": f"请分析以下消息:\n{messages_text}"}] ) content = response.content[0].text result = json.loads(content) if isinstance(result, dict) and 'analysis_results' in result: return result['analysis_results'] else: logging.error(f"Claude API返回格式不正确: {content}") return [] except Exception as e: logging.error(f"Claude API调用失败: {e}", exc_info=True) return [] async def _analyze_with_deepseek(self, messages_text: str, system_prompt: str, model: str, max_tokens: int) -> List[Dict]: """使用 DeepSeek API 进行分析""" try: response = await self.deepseek_client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"请分析以下消息:\n{messages_text}"} ], temperature=0.3, max_tokens=max_tokens ) content = response.choices[0].message.content result = json.loads(content) if isinstance(result, dict) and 'analysis_results' in result: return result['analysis_results'] else: logging.error(f"DeepSeek API返回格式不正确: {content}") return [] except Exception as e: logging.error(f"DeepSeek API调用失败: {e}", exc_info=True) return [] def format_analysis_for_display(self, analysis: Dict) -> Dict: """将分析结果格式化为前端显示格式""" base_result = { 'id': analysis.get('message_id', ''), 'sentiment': analysis.get('sentiment', ''), 'sentiment_score': f"{float(analysis.get('sentiment_score', 0)):.2%}", 'keywords': ', '.join(analysis.get('keywords', [])), 'key_points': analysis.get('key_points', ''), 'influence': analysis.get('influence_analysis', ''), 'risk_level': analysis.get('risk_level', ''), 'analysis_time': datetime.fromtimestamp( float(analysis.get('timestamp', 0)) ).strftime('%Y-%m-%d %H:%M:%S') } # 如果是深度分析,添加额外信息 if 'risk_factors' in analysis: base_result.update({ 'risk_factors': analysis.get('risk_factors', []), 'suggestions': analysis.get('suggestions', []) }) return base_result # 创建全局 AI 分析器实例 ai_analyzer = AIAnalyzer() # 若需要直接配置或测试,可在此处编写测试代码 if __name__ == "__main__": # 示例:直接配置并调用分析器(可替换为实际测试代码) sample_messages = [ {"id": "1", "content": "今天天气真好,我很开心。"}, {"id": "2", "content": "经济形势不容乐观,风险较大。"} ] async def test(): results = await ai_analyzer.analyze_messages(sample_messages, model_type="gpt-4o-latest", analysis_depth="standard") for res in results: print(ai_analyzer.format_analysis_for_display(res)) asyncio.run(test())