352 lines
16 KiB
Python
352 lines
16 KiB
Python
import openai
|
|
import anthropic
|
|
import json
|
|
from typing import List, Dict, Tuple, Any
|
|
import os
|
|
import asyncio
|
|
import math
|
|
from datetime import datetime
|
|
from utils.logger import app_logger as logging
|
|
|
|
class AIAnalyzer:
|
|
def __init__(self):
|
|
# 尝试从环境变量中获取API密钥,如果没有则主动询问配置
|
|
self.openai_key = os.getenv('OPENAI_API_KEY')
|
|
if not self.openai_key:
|
|
print("未检测到 OPENAI_API_KEY。")
|
|
# 提示时允许按回车跳过输入
|
|
self.openai_key = input("请输入 OPENAI_API_KEY (按回车键跳过输入): ").strip()
|
|
|
|
self.claude_key = os.getenv('ANTHROPIC_API_KEY')
|
|
if not self.claude_key:
|
|
print("未检测到 ANTHROPIC_API_KEY。")
|
|
self.claude_key = input("请输入 ANTHROPIC_API_KEY (按回车键跳过输入): ").strip()
|
|
|
|
self.deepseek_key = os.getenv('DEEPSEEK_API_KEY')
|
|
if not self.deepseek_key:
|
|
print("未检测到 DEEPSEEK_API_KEY。")
|
|
self.deepseek_key = input("请输入 DEEPSEEK_API_KEY (按回车键跳过输入): ").strip()
|
|
|
|
# 如果不希望通过交互输入,也可以直接在此处配置(注释掉下面几行即可)
|
|
# self.openai_key = "你的OpenAI_API_KEY"
|
|
# self.claude_key = "你的ANTHROPIC_API_KEY"
|
|
# self.deepseek_key = "你的DEEPSEEK_API_KEY"
|
|
|
|
# 配置各API客户端
|
|
if self.openai_key:
|
|
openai.api_key = self.openai_key
|
|
if self.claude_key:
|
|
self.claude_client = anthropic.Anthropic(api_key=self.claude_key)
|
|
if self.deepseek_key:
|
|
self.deepseek_client = openai.OpenAI(
|
|
api_key=self.deepseek_key,
|
|
base_url="https://api.deepseek.com/v1"
|
|
)
|
|
|
|
# 支持的模型列表(增加了最新的 ChatGPT 和 Claude 模型)
|
|
self.supported_models: Dict[str, Dict[str, Any]] = {
|
|
# OpenAI 最新模型(ChatGPT系列)
|
|
'gpt-4o-latest': {
|
|
'provider': 'openai',
|
|
'max_tokens': 128000, # 支持大窗口
|
|
'cost_per_1k': 0.01 # 参考价格(美元)
|
|
},
|
|
'gpt-4o-mini': {
|
|
'provider': 'openai',
|
|
'max_tokens': 4000, # 轻量版,适合快速任务
|
|
'cost_per_1k': 0.00015 # 成本大幅降低
|
|
},
|
|
# 旧版OpenAI模型
|
|
'gpt-3.5-turbo': {'provider': 'openai', 'max_tokens': 2000, 'cost_per_1k': 0.0015},
|
|
'gpt-3.5-turbo-16k': {'provider': 'openai', 'max_tokens': 16000, 'cost_per_1k': 0.003},
|
|
'gpt-4': {'provider': 'openai', 'max_tokens': 8000, 'cost_per_1k': 0.03},
|
|
'gpt-4-32k': {'provider': 'openai', 'max_tokens': 32000, 'cost_per_1k': 0.06},
|
|
'gpt-4-turbo-preview': {'provider': 'openai', 'max_tokens': 128000, 'cost_per_1k': 0.01},
|
|
|
|
# Anthropic 最新模型(Claude系列)
|
|
'claude-3.5-sonnet-new': {
|
|
'provider': 'anthropic',
|
|
'max_tokens': 200000, # 新版Claude 3.5 Sonnet
|
|
'cost_per_1k': 0.015
|
|
},
|
|
'claude-3.5-haiku': {
|
|
'provider': 'anthropic',
|
|
'max_tokens': 200000, # 最新Claude 3.5 Haiku
|
|
'cost_per_1k': 0.0025
|
|
},
|
|
# 旧版Claude模型
|
|
'claude-2.1': {'provider': 'anthropic', 'max_tokens': 100000, 'cost_per_1k': 0.008},
|
|
'claude-2.0': {'provider': 'anthropic', 'max_tokens': 100000, 'cost_per_1k': 0.008},
|
|
'claude-instant-1.2': {'provider': 'anthropic', 'max_tokens': 100000, 'cost_per_1k': 0.0015},
|
|
|
|
# DeepSeek 模型
|
|
'deepseek-chat': {'provider': 'deepseek', 'max_tokens': 4000, 'cost_per_1k': 0.002},
|
|
'deepseek-reasoner': {'provider': 'deepseek', 'max_tokens': 4000, 'cost_per_1k': 0.003}
|
|
}
|
|
|
|
# 不同深度的分析提示词
|
|
self.prompt_templates: Dict[str, str] = {
|
|
'basic': """你是一个专业的舆情分析助手。请对每条消息进行基础的情感分析。
|
|
请按以下JSON格式返回:
|
|
{
|
|
"analysis_results": [
|
|
{
|
|
"message_id": "消息ID",
|
|
"sentiment": "情感倾向 (积极/消极/中性)",
|
|
"sentiment_score": "情感分数 (0-1)",
|
|
"keywords": ["关键词1", "关键词2"],
|
|
"key_points": "简要概述",
|
|
"influence_analysis": "基础影响分析",
|
|
"risk_level": "风险等级 (低/中/高)",
|
|
"timestamp": "分析时间戳"
|
|
}
|
|
]
|
|
}""",
|
|
'standard': """你是一个专业的舆情分析助手。请对每条消息进行标准深度的分析。
|
|
请按以下JSON格式返回:
|
|
{
|
|
"analysis_results": [
|
|
{
|
|
"message_id": "消息ID",
|
|
"sentiment": "情感倾向 (积极/消极/中性)",
|
|
"sentiment_score": "情感分数 (0-1)",
|
|
"keywords": ["关键词1", "关键词2", "关键词3"],
|
|
"key_points": "核心观点概述",
|
|
"influence_analysis": "潜在影响分析",
|
|
"risk_level": "风险等级 (低/中/高)",
|
|
"timestamp": "分析时间戳"
|
|
}
|
|
]
|
|
}""",
|
|
'deep': """你是一个专业的舆情分析助手。请对每条消息进行深度分析。
|
|
请按以下JSON格式返回:
|
|
{
|
|
"analysis_results": [
|
|
{
|
|
"message_id": "消息ID",
|
|
"sentiment": "情感倾向 (积极/消极/中性)",
|
|
"sentiment_score": "情感分数 (0-1)",
|
|
"keywords": ["关键词1", "关键词2", "关键词3", "关键词4", "关键词5"],
|
|
"key_points": "详细的核心观点分析",
|
|
"influence_analysis": "深度影响分析,包括短期和长期影响",
|
|
"risk_factors": ["风险因素1", "风险因素2", "风险因素3"],
|
|
"risk_level": "风险等级 (低/中/高)",
|
|
"suggestions": ["建议1", "建议2", "建议3"],
|
|
"timestamp": "分析时间戳"
|
|
}
|
|
]
|
|
}"""
|
|
}
|
|
|
|
async def analyze_messages(self, messages: List[Dict], batch_size: int = 50,
|
|
model_type: str = "gpt-3.5-turbo",
|
|
analysis_depth: str = "standard",
|
|
prefer_deepseek: bool = True) -> List[Dict]:
|
|
"""
|
|
分析一批消息并返回分析结果。
|
|
如果 DeepSeek API 可用且 prefer_deepseek 为 True,则优先使用 DeepSeek 模型。
|
|
"""
|
|
try:
|
|
# 优先使用 DeepSeek 模型以降低成本
|
|
if prefer_deepseek and self.deepseek_key:
|
|
if model_type not in ['deepseek-chat', 'deepseek-reasoner']:
|
|
logging.info("检测到 DeepSeek API, 优先使用 'deepseek-chat' 模型以降低成本。")
|
|
model_type = 'deepseek-chat'
|
|
|
|
if model_type not in self.supported_models:
|
|
raise ValueError(f"不支持的模型类型: {model_type}")
|
|
|
|
model_info = self.supported_models[model_type]
|
|
provider = model_info['provider']
|
|
max_tokens = model_info['max_tokens']
|
|
|
|
# 根据模型类型调整批处理大小
|
|
optimal_batch_size = self._get_optimal_batch_size(model_type)
|
|
adjusted_batch_size = min(batch_size, optimal_batch_size)
|
|
if adjusted_batch_size != batch_size:
|
|
logging.info(f"已将批处理大小从 {batch_size} 调整为 {adjusted_batch_size}")
|
|
|
|
tasks = []
|
|
total_cost = 0.0
|
|
# 分批处理消息并异步调用分析任务
|
|
for i in range(0, len(messages), adjusted_batch_size):
|
|
batch = messages[i:i + adjusted_batch_size]
|
|
system_prompt = self.prompt_templates.get(analysis_depth, self.prompt_templates['standard'])
|
|
tasks.append(self._process_batch(batch, system_prompt, model_type, max_tokens, provider))
|
|
|
|
# 并发执行所有批次任务
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
all_results = []
|
|
for batch_result, batch_cost in results:
|
|
all_results.extend(batch_result)
|
|
total_cost += batch_cost
|
|
|
|
logging.info(f"分析完成, 总成本: ${total_cost:.4f}")
|
|
return all_results
|
|
except Exception as e:
|
|
logging.error(f"AI分析过程出错: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def _process_batch(self, batch: List[Dict], system_prompt: str,
|
|
model_type: str, max_tokens: int, provider: str) -> Tuple[List[Dict], float]:
|
|
"""
|
|
处理单个批次的消息,返回 (分析结果, 本批次成本)
|
|
"""
|
|
try:
|
|
formatted_messages = [
|
|
f"消息ID: {msg.get('id')}\n内容: {msg.get('content')}" for msg in batch
|
|
]
|
|
messages_text = "\n---\n".join(formatted_messages)
|
|
|
|
if provider == 'openai':
|
|
result = await self._analyze_with_openai(messages_text, system_prompt, model_type, max_tokens)
|
|
elif provider == 'anthropic':
|
|
result = await self._analyze_with_claude(messages_text, system_prompt, model_type, max_tokens)
|
|
elif provider == 'deepseek':
|
|
result = await self._analyze_with_deepseek(messages_text, system_prompt, model_type, max_tokens)
|
|
else:
|
|
logging.error(f"未知的API供应商: {provider}")
|
|
return ([], 0.0)
|
|
|
|
batch_cost = self._calculate_cost(len(messages_text), model_type)
|
|
logging.info(f"批次处理完成, 成本: ${batch_cost:.4f}")
|
|
return (result, batch_cost)
|
|
except Exception as e:
|
|
logging.error(f"处理批次时出错: {e}", exc_info=True)
|
|
return ([], 0.0)
|
|
|
|
def _get_optimal_batch_size(self, model_type: str) -> int:
|
|
"""根据模型类型获取最优批处理大小"""
|
|
model_info = self.supported_models[model_type]
|
|
max_tokens = model_info['max_tokens']
|
|
|
|
# 估算每条消息的平均 token 数(假设为 200)
|
|
avg_tokens_per_message = 200
|
|
# 预留 20% 的 token 用于系统提示词和响应
|
|
available_tokens = int(max_tokens * 0.8)
|
|
optimal_batch_size = max(1, min(100, available_tokens // avg_tokens_per_message))
|
|
return optimal_batch_size
|
|
|
|
def _calculate_cost(self, input_length: int, model_type: str) -> float:
|
|
"""计算 API 调用成本"""
|
|
model_info = self.supported_models[model_type]
|
|
cost_per_1k = model_info['cost_per_1k']
|
|
# 估算 token 数(假设每 4 个字符约等于 1 个 token)
|
|
estimated_tokens = math.ceil(input_length / 4)
|
|
cost = (estimated_tokens / 1000) * cost_per_1k
|
|
return cost
|
|
|
|
async def _analyze_with_openai(self, messages_text: str, system_prompt: str,
|
|
model: str, max_tokens: int) -> List[Dict]:
|
|
"""使用 OpenAI API 进行分析"""
|
|
try:
|
|
response = await openai.ChatCompletion.acreate(
|
|
model=model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": f"请分析以下消息:\n{messages_text}"}
|
|
],
|
|
temperature=0.3,
|
|
max_tokens=max_tokens,
|
|
n=1
|
|
)
|
|
content = response.choices[0].message.content
|
|
result = json.loads(content)
|
|
if isinstance(result, dict) and 'analysis_results' in result:
|
|
return result['analysis_results']
|
|
else:
|
|
logging.error(f"OpenAI API返回格式不正确: {content}")
|
|
return []
|
|
except Exception as e:
|
|
logging.error(f"OpenAI API调用失败: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def _analyze_with_claude(self, messages_text: str, system_prompt: str,
|
|
model: str, max_tokens: int) -> List[Dict]:
|
|
"""使用 Claude API 进行分析"""
|
|
try:
|
|
response = await self.claude_client.messages.create(
|
|
model=model,
|
|
max_tokens=max_tokens,
|
|
temperature=0.3,
|
|
system=system_prompt,
|
|
messages=[{"role": "user", "content": f"请分析以下消息:\n{messages_text}"}]
|
|
)
|
|
content = response.content[0].text
|
|
result = json.loads(content)
|
|
if isinstance(result, dict) and 'analysis_results' in result:
|
|
return result['analysis_results']
|
|
else:
|
|
logging.error(f"Claude API返回格式不正确: {content}")
|
|
return []
|
|
except Exception as e:
|
|
logging.error(f"Claude API调用失败: {e}", exc_info=True)
|
|
return []
|
|
|
|
async def _analyze_with_deepseek(self, messages_text: str, system_prompt: str,
|
|
model: str, max_tokens: int) -> List[Dict]:
|
|
"""使用 DeepSeek API 进行分析"""
|
|
try:
|
|
response = await self.deepseek_client.chat.completions.create(
|
|
model=model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": f"请分析以下消息:\n{messages_text}"}
|
|
],
|
|
temperature=0.3,
|
|
max_tokens=max_tokens
|
|
)
|
|
content = response.choices[0].message.content
|
|
result = json.loads(content)
|
|
if isinstance(result, dict) and 'analysis_results' in result:
|
|
return result['analysis_results']
|
|
else:
|
|
logging.error(f"DeepSeek API返回格式不正确: {content}")
|
|
return []
|
|
except Exception as e:
|
|
logging.error(f"DeepSeek API调用失败: {e}", exc_info=True)
|
|
return []
|
|
|
|
def format_analysis_for_display(self, analysis: Dict) -> Dict:
|
|
"""将分析结果格式化为前端显示格式"""
|
|
base_result = {
|
|
'id': analysis.get('message_id', ''),
|
|
'sentiment': analysis.get('sentiment', ''),
|
|
'sentiment_score': f"{float(analysis.get('sentiment_score', 0)):.2%}",
|
|
'keywords': ', '.join(analysis.get('keywords', [])),
|
|
'key_points': analysis.get('key_points', ''),
|
|
'influence': analysis.get('influence_analysis', ''),
|
|
'risk_level': analysis.get('risk_level', ''),
|
|
'analysis_time': datetime.fromtimestamp(
|
|
float(analysis.get('timestamp', 0))
|
|
).strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
# 如果是深度分析,添加额外信息
|
|
if 'risk_factors' in analysis:
|
|
base_result.update({
|
|
'risk_factors': analysis.get('risk_factors', []),
|
|
'suggestions': analysis.get('suggestions', [])
|
|
})
|
|
|
|
return base_result
|
|
|
|
# 创建全局 AI 分析器实例
|
|
ai_analyzer = AIAnalyzer()
|
|
|
|
# 若需要直接配置或测试,可在此处编写测试代码
|
|
if __name__ == "__main__":
|
|
# 示例:直接配置并调用分析器(可替换为实际测试代码)
|
|
sample_messages = [
|
|
{"id": "1", "content": "今天天气真好,我很开心。"},
|
|
{"id": "2", "content": "经济形势不容乐观,风险较大。"}
|
|
]
|
|
|
|
async def test():
|
|
results = await ai_analyzer.analyze_messages(sample_messages, model_type="gpt-4o-latest", analysis_depth="standard")
|
|
for res in results:
|
|
print(ai_analyzer.format_analysis_for_display(res))
|
|
|
|
asyncio.run(test())
|