diff --git a/utils/ai_analyzer.py b/utils/ai_analyzer.py index 04d6ea2..f5cb0ac 100644 --- a/utils/ai_analyzer.py +++ b/utils/ai_analyzer.py @@ -14,9 +14,26 @@ class AIAnalyzer: openai.api_key = self.api_key - # 系统提示词,限制AI的输出格式 - self.system_prompt = """你是一个专业的舆情分析助手。你的任务是分析每条消息的情感倾向、关键词和潜在影响。 -请严格按照以下JSON格式返回分析结果: + # 不同深度的分析提示词 + self.prompt_templates = { + 'basic': """你是一个专业的舆情分析助手。请对每条消息进行基础的情感分析。 +请按以下JSON格式返回: +{ + "analysis_results": [ + { + "message_id": "消息ID", + "sentiment": "情感倾向 (积极/消极/中性)", + "sentiment_score": "情感分数 (0-1)", + "keywords": ["关键词1", "关键词2"], + "key_points": "简要概述", + "influence_analysis": "基础影响分析", + "risk_level": "风险等级 (低/中/高)", + "timestamp": "分析时间戳" + } + ] +}""", + 'standard': """你是一个专业的舆情分析助手。请对每条消息进行标准深度的分析。 +请按以下JSON格式返回: { "analysis_results": [ { @@ -30,41 +47,69 @@ class AIAnalyzer: "timestamp": "分析时间戳" } ] -} -请确保每个字段都有值,并保持JSON格式的一致性。""" +}""", + 'deep': """你是一个专业的舆情分析助手。请对每条消息进行深度分析。 +请按以下JSON格式返回: +{ + "analysis_results": [ + { + "message_id": "消息ID", + "sentiment": "情感倾向 (积极/消极/中性)", + "sentiment_score": "情感分数 (0-1)", + "keywords": ["关键词1", "关键词2", "关键词3", "关键词4", "关键词5"], + "key_points": "详细的核心观点分析", + "influence_analysis": "深度影响分析,包括短期和长期影响", + "risk_factors": ["风险因素1", "风险因素2", "风险因素3"], + "risk_level": "风险等级 (低/中/高)", + "suggestions": ["建议1", "建议2", "建议3"], + "timestamp": "分析时间戳" + } + ] +}""" + } - async def analyze_messages(self, messages: List[Dict]) -> List[Dict]: + async def analyze_messages(self, messages: List[Dict], batch_size: int = 50, + model_type: str = "gpt-3.5-turbo", + analysis_depth: str = "standard") -> List[Dict]: """分析一批消息并返回分析结果""" try: - # 构建输入消息 - formatted_messages = [] - for msg in messages: - formatted_messages.append(f"消息ID: {msg['id']}\n内容: {msg['content']}") + all_results = [] - messages_text = "\n---\n".join(formatted_messages) - - # 调用OpenAI API - response = await openai.ChatCompletion.acreate( - model="gpt-3.5-turbo", - messages=[ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": f"请分析以下消息:\n{messages_text}"} - ], - temperature=0.3, # 降低随机性 - max_tokens=2000, - n=1 - ) - - # 解析返回结果 - try: - result = json.loads(response.choices[0].message.content) - # 验证结果格式 - if not isinstance(result, dict) or 'analysis_results' not in result: - raise ValueError("AI返回格式不正确") - return result['analysis_results'] - except json.JSONDecodeError: - logging.error("AI返回结果解析失败") - return [] + # 分批处理消息 + for i in range(0, len(messages), batch_size): + batch = messages[i:i + batch_size] + formatted_messages = [] + for msg in batch: + formatted_messages.append(f"消息ID: {msg['id']}\n内容: {msg['content']}") + + messages_text = "\n---\n".join(formatted_messages) + + # 获取对应深度的提示词 + system_prompt = self.prompt_templates.get(analysis_depth, self.prompt_templates['standard']) + + # 调用OpenAI API + response = await openai.ChatCompletion.acreate( + model=model_type, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"请分析以下消息:\n{messages_text}"} + ], + temperature=0.3, # 降低随机性 + max_tokens=2000 if analysis_depth != 'deep' else 3000, + n=1 + ) + + try: + result = json.loads(response.choices[0].message.content) + if isinstance(result, dict) and 'analysis_results' in result: + all_results.extend(result['analysis_results']) + else: + logging.error(f"API返回格式不正确: {response.choices[0].message.content}") + except json.JSONDecodeError as e: + logging.error(f"JSON解析失败: {e}") + continue + + return all_results except Exception as e: logging.error(f"AI分析过程出错: {e}") @@ -72,7 +117,7 @@ class AIAnalyzer: def format_analysis_for_display(self, analysis: Dict) -> Dict: """将分析结果格式化为前端显示格式""" - return { + base_result = { 'id': analysis['message_id'], 'sentiment': analysis['sentiment'], 'sentiment_score': f"{float(analysis['sentiment_score']):.2%}", @@ -84,6 +129,15 @@ class AIAnalyzer: float(analysis['timestamp']) ).strftime('%Y-%m-%d %H:%M:%S') } + + # 如果是深度分析,添加额外信息 + if 'risk_factors' in analysis: + base_result.update({ + 'risk_factors': analysis['risk_factors'], + 'suggestions': analysis['suggestions'] + }) + + return base_result # 创建全局AI分析器实例 ai_analyzer = AIAnalyzer() \ No newline at end of file diff --git a/views/page/page.py b/views/page/page.py index 7d3b7ab..eba7873 100644 --- a/views/page/page.py +++ b/views/page/page.py @@ -308,11 +308,33 @@ def articleChar(id): @pb.route('/api/analyze_messages', methods=['POST']) async def analyze_messages(): try: - # 获取最近50条消息 - messages = getRecentMessages(50) # 需要实现这个函数 + # 获取请求参数 + data = request.get_json() + batch_size = data.get('batch_size', 50) + model_type = data.get('model_type', 'gpt-3.5-turbo') + analysis_depth = data.get('analysis_depth', 'standard') + + # 获取最近的消息 + messages = getRecentMessages(batch_size) + if not messages: + return jsonify({ + 'success': False, + 'error': '没有找到需要分析的消息' + }), 404 # 调用AI进行分析 - analysis_results = await ai_analyzer.analyze_messages(messages) + analysis_results = await ai_analyzer.analyze_messages( + messages=messages, + batch_size=batch_size, + model_type=model_type, + analysis_depth=analysis_depth + ) + + if not analysis_results: + return jsonify({ + 'success': False, + 'error': '分析过程中出现错误' + }), 500 # 保存到数据库 with Session(engine) as session: @@ -337,7 +359,14 @@ async def analyze_messages(): return jsonify({ 'success': True, - 'data': display_results + 'data': display_results, + 'meta': { + 'total_messages': len(messages), + 'analyzed_messages': len(analysis_results), + 'batch_size': batch_size, + 'model_type': model_type, + 'analysis_depth': analysis_depth + } }) except Exception as e: diff --git a/views/page/templates/yuqingpredict.html b/views/page/templates/yuqingpredict.html index 88a6dbc..3241780 100644 --- a/views/page/templates/yuqingpredict.html +++ b/views/page/templates/yuqingpredict.html @@ -454,11 +454,74 @@