diff --git a/model_pro/LSTM_model.py b/model_pro/LSTM_model.py new file mode 100644 index 0000000..095f989 --- /dev/null +++ b/model_pro/LSTM_model.py @@ -0,0 +1,391 @@ +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import Dataset, DataLoader +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix +import jieba +from transformers import BertTokenizer +import logging +import os + +# 配置日志记录 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger('LSTM_model') + +class TextDataset(Dataset): + """文本数据集类,用于加载和预处理文本数据""" + + def __init__(self, texts, labels, tokenizer, max_length=128): + self.texts = texts + self.labels = labels + self.tokenizer = tokenizer + self.max_length = max_length + + def __len__(self): + return len(self.texts) + + def __getitem__(self, idx): + text = str(self.texts[idx]) + label = self.labels[idx] + + # BERT分词并获得输入ID和注意力掩码 + encoding = self.tokenizer.encode_plus( + text, + add_special_tokens=True, + max_length=self.max_length, + padding='max_length', + truncation=True, + return_attention_mask=True, + return_tensors='pt' + ) + + return { + 'text': text, + 'input_ids': encoding['input_ids'].flatten(), + 'attention_mask': encoding['attention_mask'].flatten(), + 'label': torch.tensor(label, dtype=torch.long) + } + +class LSTMSentimentModel(nn.Module): + """基于LSTM的情感分析模型""" + + def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=2, + bidirectional=True, dropout=0.5, pad_idx=0): + super().__init__() + + # 嵌入层 + self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx) + + # LSTM层 + self.lstm = nn.LSTM( + embedding_dim, + hidden_dim, + num_layers=n_layers, + bidirectional=bidirectional, + dropout=dropout if n_layers > 1 else 0, + batch_first=True + ) + + # 全连接层,如果是双向LSTM,输入维度需要翻倍 + self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim) + + # Dropout层 + self.dropout = nn.Dropout(dropout) + + def forward(self, text, attention_mask=None): + # 文本通过嵌入层 [batch_size, seq_len] -> [batch_size, seq_len, embedding_dim] + embedded = self.embedding(text) + + # 应用dropout + embedded = self.dropout(embedded) + + # 通过LSTM [batch_size, seq_len, embedding_dim] -> [batch_size, seq_len, hidden_dim*2] + if attention_mask is not None: + # 创建打包的序列 + lengths = attention_mask.sum(dim=1).to('cpu') + packed_embedded = nn.utils.rnn.pack_padded_sequence( + embedded, lengths, batch_first=True, enforce_sorted=False + ) + packed_output, (hidden, cell) = self.lstm(packed_embedded) + # 解包序列 + output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True) + else: + output, (hidden, cell) = self.lstm(embedded) + + # 如果是双向LSTM,需要拼接最后一层的前向和后向隐藏状态 + if self.lstm.bidirectional: + hidden = torch.cat([hidden[-2], hidden[-1]], dim=1) + else: + hidden = hidden[-1] + + # 应用dropout + hidden = self.dropout(hidden) + + # 全连接层 + return self.fc(hidden) + +class LSTMModelManager: + """LSTM模型管理类,用于训练、评估和预测""" + + def __init__(self, bert_model_path, model_save_path=None, vocab_size=30522, + embedding_dim=128, hidden_dim=256, output_dim=2, n_layers=2, + bidirectional=True, dropout=0.5): + self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + self.tokenizer = BertTokenizer.from_pretrained(bert_model_path) + self.vocab_size = vocab_size + self.model = LSTMSentimentModel( + vocab_size=vocab_size, + embedding_dim=embedding_dim, + hidden_dim=hidden_dim, + output_dim=output_dim, + n_layers=n_layers, + bidirectional=bidirectional, + dropout=dropout, + pad_idx=self.tokenizer.pad_token_id + ).to(self.device) + + self.model_save_path = model_save_path + if model_save_path and os.path.exists(model_save_path): + self.model.load_state_dict(torch.load(model_save_path, map_location=self.device)) + logger.info(f"已从 {model_save_path} 加载模型") + + def train(self, train_texts, train_labels, val_texts=None, val_labels=None, + batch_size=32, learning_rate=2e-5, epochs=10, validation_split=0.2): + """训练模型""" + logger.info("开始训练模型...") + + # 如果没有提供验证集,从训练集中划分 + if val_texts is None or val_labels is None: + train_texts, val_texts, train_labels, val_labels = train_test_split( + train_texts, train_labels, test_size=validation_split, random_state=42 + ) + + # 创建数据集和数据加载器 + train_dataset = TextDataset(train_texts, train_labels, self.tokenizer) + val_dataset = TextDataset(val_texts, val_labels, self.tokenizer) + + train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) + val_dataloader = DataLoader(val_dataset, batch_size=batch_size) + + # 优化器和损失函数 + optimizer = optim.Adam(self.model.parameters(), lr=learning_rate) + criterion = nn.CrossEntropyLoss() + + # 训练循环 + best_val_loss = float('inf') + for epoch in range(epochs): + # 训练模式 + self.model.train() + train_loss = 0 + train_preds = [] + train_labels_list = [] + + for batch in train_dataloader: + # 获取数据 + input_ids = batch['input_ids'].to(self.device) + attention_mask = batch['attention_mask'].to(self.device) + labels = batch['label'].to(self.device) + + # 前向传播 + optimizer.zero_grad() + outputs = self.model(input_ids, attention_mask) + + # 计算损失 + loss = criterion(outputs, labels) + train_loss += loss.item() + + # 反向传播 + loss.backward() + optimizer.step() + + # 收集预测和标签 + _, predicted = torch.max(outputs, 1) + train_preds.extend(predicted.cpu().numpy()) + train_labels_list.extend(labels.cpu().numpy()) + + # 计算训练集的评估指标 + train_accuracy = accuracy_score(train_labels_list, train_preds) + train_f1 = f1_score(train_labels_list, train_preds, average='macro') + + # 验证模式 + self.model.eval() + val_loss = 0 + val_preds = [] + val_labels_list = [] + + with torch.no_grad(): + for batch in val_dataloader: + input_ids = batch['input_ids'].to(self.device) + attention_mask = batch['attention_mask'].to(self.device) + labels = batch['label'].to(self.device) + + outputs = self.model(input_ids, attention_mask) + loss = criterion(outputs, labels) + val_loss += loss.item() + + _, predicted = torch.max(outputs, 1) + val_preds.extend(predicted.cpu().numpy()) + val_labels_list.extend(labels.cpu().numpy()) + + # 计算验证集的评估指标 + val_accuracy = accuracy_score(val_labels_list, val_preds) + val_f1 = f1_score(val_labels_list, val_preds, average='macro') + + # 计算平均损失 + train_loss /= len(train_dataloader) + val_loss /= len(val_dataloader) + + logger.info(f'Epoch {epoch+1}/{epochs} | ' + f'Train Loss: {train_loss:.4f} | ' + f'Train Acc: {train_accuracy:.4f} | ' + f'Train F1: {train_f1:.4f} | ' + f'Val Loss: {val_loss:.4f} | ' + f'Val Acc: {val_accuracy:.4f} | ' + f'Val F1: {val_f1:.4f}') + + # 保存最佳模型 + if val_loss < best_val_loss and self.model_save_path: + best_val_loss = val_loss + torch.save(self.model.state_dict(), self.model_save_path) + logger.info(f"模型已保存到 {self.model_save_path}") + + # 如果有保存路径但没有保存过模型,保存最后一轮的模型 + if self.model_save_path and best_val_loss == float('inf'): + torch.save(self.model.state_dict(), self.model_save_path) + logger.info(f"最终模型已保存到 {self.model_save_path}") + + return train_loss, val_loss, val_accuracy, val_f1 + + def evaluate(self, test_texts, test_labels, batch_size=32): + """评估模型""" + logger.info("评估模型...") + + # 创建测试数据集和数据加载器 + test_dataset = TextDataset(test_texts, test_labels, self.tokenizer) + test_dataloader = DataLoader(test_dataset, batch_size=batch_size) + + # 设置为评估模式 + self.model.eval() + + # 损失函数 + criterion = nn.CrossEntropyLoss() + test_loss = 0 + test_preds = [] + test_probs = [] + test_labels_list = [] + + with torch.no_grad(): + for batch in test_dataloader: + input_ids = batch['input_ids'].to(self.device) + attention_mask = batch['attention_mask'].to(self.device) + labels = batch['label'].to(self.device) + + outputs = self.model(input_ids, attention_mask) + loss = criterion(outputs, labels) + test_loss += loss.item() + + probs = torch.softmax(outputs, dim=1) + _, predicted = torch.max(outputs, 1) + + test_preds.extend(predicted.cpu().numpy()) + test_probs.extend(probs.cpu().numpy()) + test_labels_list.extend(labels.cpu().numpy()) + + # 计算平均损失 + test_loss /= len(test_dataloader) + + # 计算评估指标 + accuracy = accuracy_score(test_labels_list, test_preds) + precision = precision_score(test_labels_list, test_preds, average='macro') + recall = recall_score(test_labels_list, test_preds, average='macro') + f1 = f1_score(test_labels_list, test_preds, average='macro') + conf_matrix = confusion_matrix(test_labels_list, test_preds) + + logger.info(f'Test Loss: {test_loss:.4f}') + logger.info(f'Accuracy: {accuracy:.4f}') + logger.info(f'Precision: {precision:.4f}') + logger.info(f'Recall: {recall:.4f}') + logger.info(f'F1 Score: {f1:.4f}') + logger.info(f'Confusion Matrix:\n{conf_matrix}') + + return { + 'loss': test_loss, + 'accuracy': accuracy, + 'precision': precision, + 'recall': recall, + 'f1': f1, + 'confusion_matrix': conf_matrix, + 'predictions': test_preds, + 'probabilities': test_probs + } + + def predict_batch(self, texts, batch_size=32): + """批量预测文本的情感""" + if not texts: + return None, None + + # 确保文本是列表格式 + if isinstance(texts, str): + texts = [texts] + + # 创建数据集(没有标签,使用占位符) + dummy_labels = [0] * len(texts) + dataset = TextDataset(texts, dummy_labels, self.tokenizer) + dataloader = DataLoader(dataset, batch_size=batch_size) + + # 设置为评估模式 + self.model.eval() + + all_preds = [] + all_probs = [] + + with torch.no_grad(): + for batch in dataloader: + input_ids = batch['input_ids'].to(self.device) + attention_mask = batch['attention_mask'].to(self.device) + + outputs = self.model(input_ids, attention_mask) + probs = torch.softmax(outputs, dim=1) + _, predicted = torch.max(outputs, 1) + + all_preds.extend(predicted.cpu().numpy()) + all_probs.extend(probs.cpu().numpy()) + + return all_preds, all_probs + + def predict(self, text): + """预测单个文本的情感""" + predictions, probabilities = self.predict_batch([text]) + if predictions is not None and len(predictions) > 0: + return predictions[0], probabilities[0] + return None, None + +# 创建全局模型实例 +lstm_model_manager = LSTMModelManager( + bert_model_path='model_pro/bert_model', + model_save_path='model_pro/lstm_model.pt' +) + +# 测试代码 +if __name__ == "__main__": + # 加载数据 + train_data = pd.read_csv('model_pro/train.csv') + dev_data = pd.read_csv('model_pro/dev.csv') + test_data = pd.read_csv('model_pro/test.csv') + + # 处理数据 + train_texts = train_data['text'].values + train_labels = train_data['label'].values + + dev_texts = dev_data['text'].values + dev_labels = dev_data['label'].values + + test_texts = test_data['text'].values + test_labels = test_data['label'].values + + # 训练模型 + lstm_model_manager.train( + train_texts, train_labels, + val_texts=dev_texts, val_labels=dev_labels, + batch_size=32, epochs=5 + ) + + # 评估模型 + results = lstm_model_manager.evaluate(test_texts, test_labels) + + # 测试预测功能 + test_sentences = [ + "这件事情做得非常好", + "服务太差了,态度恶劣", + "这个产品质量一般,但价格便宜", + "我对这家公司非常满意", + ] + + for sentence in test_sentences: + pred, prob = lstm_model_manager.predict(sentence) + label = '良好' if pred == 0 else '不良' + confidence = prob[pred] + print(f"句子: '{sentence}' 预测结果: {label} (置信度: {confidence:.2%})") \ No newline at end of file diff --git a/model_pro/lstm_predict.py b/model_pro/lstm_predict.py new file mode 100644 index 0000000..b6996b8 --- /dev/null +++ b/model_pro/lstm_predict.py @@ -0,0 +1,165 @@ +import torch +import os +import logging +from LSTM_model import lstm_model_manager + +# 配置日志记录 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger('lstm_predict') + +class LSTMPredictor: + """LSTM预测器,与当前系统的预测接口兼容""" + + def __init__(self): + self.model_loaded = False + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + logger.info(f"初始化LSTM预测器,使用设备: {self.device}") + + def load_models(self, model_save_path, bert_model_path, tokenizer_path=None): + """ + 加载模型,与当前系统的model_manager.load_models接口兼容 + + 参数: + model_save_path: LSTM模型保存路径 + bert_model_path: BERT模型路径 + tokenizer_path: 分词器路径(LSTM模型中使用BERT的分词器,可忽略) + """ + try: + # 检查模型文件是否存在 + if not os.path.exists(model_save_path): + logger.warning(f"模型文件 {model_save_path} 不存在,需要先训练模型") + return False + + if not os.path.exists(bert_model_path): + logger.error(f"BERT模型路径 {bert_model_path} 不存在") + return False + + # 实际上我们在lstm_model_manager初始化时已经加载了模型,这里只是检查一下 + if lstm_model_manager.model is not None: + self.model_loaded = True + logger.info("LSTM模型已加载成功") + return True + else: + logger.error("LSTM模型加载失败") + return False + except Exception as e: + logger.error(f"加载模型过程中出错: {e}") + return False + + def predict_batch(self, texts): + """ + 批量预测文本的情感 + + 参数: + texts: 文本列表 + + 返回: + predictions: 预测结果列表(0表示良好,1表示不良) + probabilities: 预测概率列表 + """ + if not self.model_loaded and lstm_model_manager.model is None: + logger.error("模型未加载,无法进行预测") + return None, None + + if not texts: + logger.warning("未提供文本,无法进行预测") + return None, None + + try: + # 调用LSTM模型管理器的批量预测函数 + predictions, probabilities = lstm_model_manager.predict_batch(texts) + return predictions, probabilities + except Exception as e: + logger.error(f"预测过程中出错: {e}") + return None, None + + def predict(self, text): + """ + 预测单个文本的情感 + + 参数: + text: 文本字符串 + + 返回: + prediction: 预测结果(0表示良好,1表示不良) + probability: 预测概率 + """ + if not self.model_loaded and lstm_model_manager.model is None: + logger.error("模型未加载,无法进行预测") + return None, None + + if not text or len(text.strip()) == 0: + logger.warning("未提供文本或文本为空,无法进行预测") + return None, None + + try: + # 调用LSTM模型管理器的单个文本预测函数 + prediction, probability = lstm_model_manager.predict(text) + return prediction, probability + except Exception as e: + logger.error(f"预测过程中出错: {e}") + return None, None + + def train_model(self, train_texts, train_labels, val_texts=None, val_labels=None, + batch_size=32, learning_rate=2e-5, epochs=10): + """ + 训练模型 + + 参数: + train_texts: 训练集文本 + train_labels: 训练集标签 + val_texts: 验证集文本 + val_labels: 验证集标签 + batch_size: 批次大小 + learning_rate: 学习率 + epochs: 训练轮数 + + 返回: + 训练结果 + """ + try: + results = lstm_model_manager.train( + train_texts, train_labels, val_texts, val_labels, + batch_size, learning_rate, epochs + ) + self.model_loaded = True + return results + except Exception as e: + logger.error(f"训练模型过程中出错: {e}") + return None + +# 创建全局预测器实例 +lstm_predictor = LSTMPredictor() + +# 为了与现有代码兼容,提供一个与model_manager相同的predict_batch函数 +def predict_batch(texts): + return lstm_predictor.predict_batch(texts) + +# 为了与现有代码兼容,提供一个与model_manager相同的load_models函数 +def load_models(model_save_path, bert_model_path, tokenizer_path=None): + return lstm_predictor.load_models(model_save_path, bert_model_path, tokenizer_path) + +# 测试代码 +if __name__ == "__main__": + # 加载模型 + load_models( + model_save_path="model_pro/lstm_model.pt", + bert_model_path="model_pro/bert_model" + ) + + # 测试预测功能 + test_sentences = [ + "这件事情做得非常好", + "服务太差了,态度恶劣", + "这个产品质量一般,但价格便宜", + "我对这家公司非常满意", + ] + + for sentence in test_sentences: + pred, prob = lstm_predictor.predict(sentence) + if pred is not None: + label = '良好' if pred == 0 else '不良' + confidence = prob[pred] + print(f"句子: '{sentence}' 预测结果: {label} (置信度: {confidence:.2%})") + else: + print(f"句子: '{sentence}' 预测失败") \ No newline at end of file diff --git a/views/page/page.py b/views/page/page.py index 4b3ad81..3c1ee8f 100644 --- a/views/page/page.py +++ b/views/page/page.py @@ -20,6 +20,7 @@ from functools import wraps import bleach import re from datetime import datetime, timedelta +from model_pro.lstm_predict import lstm_predictor pb = Blueprint('page', __name__, @@ -75,12 +76,15 @@ device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 设置模型路径 model_save_path = 'model_pro/final_model.pt' +lstm_model_path = 'model_pro/lstm_model.pt' bert_model_path = 'model_pro/bert_model' ctm_tokenizer_path = 'model_pro/sentence_bert_model' # 初始化模型 try: model_manager.load_models(model_save_path, bert_model_path, ctm_tokenizer_path) + # 同时初始化LSTM模型 + lstm_predictor.load_models(lstm_model_path, bert_model_path) except Exception as e: logging.error(f"模型加载失败: {e}") @@ -315,7 +319,7 @@ def yuqingpredict(): X, Y = getTopicCreatedAtandpredictData(defaultTopic) model_type = sanitize_input(request.args.get('model', 'pro')) - if model_type not in ['pro', 'basic']: + if model_type not in ['pro', 'basic', 'lstm']: return abort(400, "无效的模型类型") # 尝试从缓存获取预测结果 @@ -333,6 +337,14 @@ def yuqingpredict(): sentences = '正面' elif value < 0.5: sentences = '负面' + elif model_type == 'lstm': + predicted_label, confidence = lstm_predictor.predict(defaultTopic) + if predicted_label is not None: + sentences = '良好' if predicted_label == 0 else '不良' + sentences = f"{sentences} (LSTM置信度: {confidence[predicted_label]:.2%})" + else: + sentences = 'LSTM预测失败,请稍后重试' + logging.error(f"LSTM预测失败,话题: {defaultTopic}") else: predicted_label, confidence = predict_sentiment(defaultTopic) if predicted_label is not None: diff --git a/views/page/templates/yuqingpredict.html b/views/page/templates/yuqingpredict.html index 0653c29..df674da 100644 --- a/views/page/templates/yuqingpredict.html +++ b/views/page/templates/yuqingpredict.html @@ -165,23 +165,10 @@