From 8ec05efe2772a2249521fee3b45e9fa6f44fa3bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=88=92=E9=85=92=E7=9A=84=E6=9D=8E=E7=99=BD?= <670939375@qq.com> Date: Mon, 10 Feb 2025 00:10:14 +0800 Subject: [PATCH] Integrating the OpenAI API for in-depth comment analysis, with usability to be debugged. --- models/ai_analysis.py | 31 +++++ utils/ai_analyzer.py | 89 ++++++++++++++ views/page/page.py | 106 +++++++++++++++++ views/page/templates/yuqingpredict.html | 149 ++++++++++++++++++++++++ 4 files changed, 375 insertions(+) create mode 100644 models/ai_analysis.py create mode 100644 utils/ai_analyzer.py diff --git a/models/ai_analysis.py b/models/ai_analysis.py new file mode 100644 index 0000000..25bbe5f --- /dev/null +++ b/models/ai_analysis.py @@ -0,0 +1,31 @@ +from sqlalchemy import Column, Integer, String, Float, DateTime, Text, JSON +from sqlalchemy.ext.declarative import declarative_base +from datetime import datetime + +Base = declarative_base() + +class AIAnalysis(Base): + __tablename__ = 'ai_analysis' + + id = Column(Integer, primary_key=True) + message_id = Column(Integer, nullable=False) + sentiment = Column(String(10), nullable=False) + sentiment_score = Column(Float, nullable=False) + keywords = Column(JSON, nullable=False) + key_points = Column(Text, nullable=False) + influence_analysis = Column(Text, nullable=False) + risk_level = Column(String(10), nullable=False) + created_at = Column(DateTime, default=datetime.now) + + def to_dict(self): + return { + 'id': self.id, + 'message_id': self.message_id, + 'sentiment': self.sentiment, + 'sentiment_score': f"{self.sentiment_score:.2%}", + 'keywords': self.keywords, + 'key_points': self.key_points, + 'influence': self.influence_analysis, + 'risk_level': self.risk_level, + 'analysis_time': self.created_at.strftime('%Y-%m-%d %H:%M:%S') + } \ No newline at end of file diff --git a/utils/ai_analyzer.py b/utils/ai_analyzer.py new file mode 100644 index 0000000..04d6ea2 --- /dev/null +++ b/utils/ai_analyzer.py @@ -0,0 +1,89 @@ +import openai +import json +from typing import List, Dict +import os +from datetime import datetime +from utils.logger import app_logger as logging + +class AIAnalyzer: + def __init__(self): + # 从环境变量获取API密钥 + self.api_key = os.getenv('OPENAI_API_KEY') + if not self.api_key: + raise ValueError("请设置OPENAI_API_KEY环境变量") + + openai.api_key = self.api_key + + # 系统提示词,限制AI的输出格式 + self.system_prompt = """你是一个专业的舆情分析助手。你的任务是分析每条消息的情感倾向、关键词和潜在影响。 +请严格按照以下JSON格式返回分析结果: +{ + "analysis_results": [ + { + "message_id": "消息ID", + "sentiment": "情感倾向 (积极/消极/中性)", + "sentiment_score": "情感分数 (0-1)", + "keywords": ["关键词1", "关键词2", "关键词3"], + "key_points": "核心观点概述", + "influence_analysis": "潜在影响分析", + "risk_level": "风险等级 (低/中/高)", + "timestamp": "分析时间戳" + } + ] +} +请确保每个字段都有值,并保持JSON格式的一致性。""" + + async def analyze_messages(self, messages: List[Dict]) -> List[Dict]: + """分析一批消息并返回分析结果""" + try: + # 构建输入消息 + formatted_messages = [] + for msg in messages: + formatted_messages.append(f"消息ID: {msg['id']}\n内容: {msg['content']}") + + messages_text = "\n---\n".join(formatted_messages) + + # 调用OpenAI API + response = await openai.ChatCompletion.acreate( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": f"请分析以下消息:\n{messages_text}"} + ], + temperature=0.3, # 降低随机性 + max_tokens=2000, + n=1 + ) + + # 解析返回结果 + try: + result = json.loads(response.choices[0].message.content) + # 验证结果格式 + if not isinstance(result, dict) or 'analysis_results' not in result: + raise ValueError("AI返回格式不正确") + return result['analysis_results'] + except json.JSONDecodeError: + logging.error("AI返回结果解析失败") + return [] + + except Exception as e: + logging.error(f"AI分析过程出错: {e}") + return [] + + def format_analysis_for_display(self, analysis: Dict) -> Dict: + """将分析结果格式化为前端显示格式""" + return { + 'id': analysis['message_id'], + 'sentiment': analysis['sentiment'], + 'sentiment_score': f"{float(analysis['sentiment_score']):.2%}", + 'keywords': ', '.join(analysis['keywords']), + 'key_points': analysis['key_points'], + 'influence': analysis['influence_analysis'], + 'risk_level': analysis['risk_level'], + 'analysis_time': datetime.fromtimestamp( + float(analysis['timestamp']) + ).strftime('%Y-%m-%d %H:%M:%S') + } + +# 创建全局AI分析器实例 +ai_analyzer = AIAnalyzer() \ No newline at end of file diff --git a/views/page/page.py b/views/page/page.py index 7cd95a0..7d3b7ab 100644 --- a/views/page/page.py +++ b/views/page/page.py @@ -9,6 +9,11 @@ from utils.getTopicPageData import * from utils.yuqingpredict import * from utils.logger import app_logger as logging from utils.cache_manager import prediction_cache +from utils.ai_analyzer import ai_analyzer +from models.ai_analysis import AIAnalysis +from sqlalchemy.orm import Session +from sqlalchemy import create_engine +import asyncio import torch from BCAT_front.predict import model_manager @@ -31,6 +36,11 @@ try: except Exception as e: logging.error(f"模型加载失败: {e}") +# 数据库配置 +DATABASE_URL = "sqlite:///ai_analysis.db" +engine = create_engine(DATABASE_URL) +AIAnalysis.metadata.create_all(engine) + def predict_sentiment(text): """使用改进版模型预测单个文本的情感""" try: @@ -294,3 +304,99 @@ def articleChar(id): except Exception as e: logging.error(f"获取文章详情时发生错误: {e}") return render_template('error.html', error_message="加载文章详情失败") + +@pb.route('/api/analyze_messages', methods=['POST']) +async def analyze_messages(): + try: + # 获取最近50条消息 + messages = getRecentMessages(50) # 需要实现这个函数 + + # 调用AI进行分析 + analysis_results = await ai_analyzer.analyze_messages(messages) + + # 保存到数据库 + with Session(engine) as session: + for result in analysis_results: + analysis = AIAnalysis( + message_id=result['message_id'], + sentiment=result['sentiment'], + sentiment_score=float(result['sentiment_score']), + keywords=result['keywords'], + key_points=result['key_points'], + influence_analysis=result['influence_analysis'], + risk_level=result['risk_level'] + ) + session.add(analysis) + session.commit() + + # 格式化结果用于显示 + display_results = [ + ai_analyzer.format_analysis_for_display(result) + for result in analysis_results + ] + + return jsonify({ + 'success': True, + 'data': display_results + }) + + except Exception as e: + logging.error(f"AI分析过程出错: {e}") + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 + +@pb.route('/api/get_analysis/') +def get_message_analysis(message_id): + """获取特定消息的分析结果""" + try: + with Session(engine) as session: + analysis = session.query(AIAnalysis)\ + .filter(AIAnalysis.message_id == message_id)\ + .order_by(AIAnalysis.created_at.desc())\ + .first() + + if analysis: + return jsonify({ + 'success': True, + 'data': analysis.to_dict() + }) + else: + return jsonify({ + 'success': False, + 'error': '未找到分析结果' + }), 404 + + except Exception as e: + logging.error(f"获取分析结果时出错: {e}") + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 + +def getRecentMessages(limit=50): + """获取最近的消息""" + # 这里需要根据你的数据库结构实现具体的查询逻辑 + messages = [] + try: + # 示例查询逻辑 + with Session(engine) as session: + results = session.execute( + """ + SELECT id, content + FROM comments + ORDER BY created_at DESC + LIMIT :limit + """, + {'limit': limit} + ).fetchall() + + messages = [ + {'id': row[0], 'content': row[1]} + for row in results + ] + except Exception as e: + logging.error(f"获取最近消息时出错: {e}") + + return messages diff --git a/views/page/templates/yuqingpredict.html b/views/page/templates/yuqingpredict.html index 0c67d01..88a6dbc 100644 --- a/views/page/templates/yuqingpredict.html +++ b/views/page/templates/yuqingpredict.html @@ -445,8 +445,157 @@ + + +
+
+
+
+
+

AI深度分析

+
+ +
+
+
+ +
+
+
+
+ + + + + + {% endblock %} {% block echarts %}