# -*- coding: utf-8 -*- """ LSTM情感分析模型训练脚本 """ import argparse import os import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence from gensim import models from sklearn.metrics import accuracy_score, f1_score, classification_report, roc_auc_score from typing import List, Tuple, Dict, Any import numpy as np from base_model import BaseModel class LSTMDataset(Dataset): """LSTM数据集""" def __init__(self, data: List[Tuple[str, int]], word2vec_model): self.data = [] self.label = [] for text, label in data: vectors = [] for word in text.split(" "): if word in word2vec_model.wv.key_to_index: vectors.append(word2vec_model.wv[word]) if len(vectors) > 0: # 确保有有效的词向量 vectors = torch.Tensor(vectors) self.data.append(vectors) self.label.append(label) def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.label) def collate_fn(data): """批处理函数""" data.sort(key=lambda x: len(x[0]), reverse=True) data_length = [len(sq[0]) for sq in data] x = [i[0] for i in data] y = [i[1] for i in data] data = pad_sequence(x, batch_first=True, padding_value=0) return data, torch.tensor(y, dtype=torch.float32), data_length class LSTMNet(nn.Module): """LSTM网络结构""" def __init__(self, input_size, hidden_size, num_layers): super(LSTMNet, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True) self.fc = nn.Linear(hidden_size * 2, 1) # 双向LSTM self.sigmoid = nn.Sigmoid() def forward(self, x, lengths): device = x.device h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device) c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device) packed_input = pack_padded_sequence(input=x, lengths=lengths, batch_first=True) packed_out, (h_n, h_c) = self.lstm(packed_input, (h0, c0)) # 双向LSTM,拼接最后的隐藏状态 lstm_out = torch.cat([h_n[-2], h_n[-1]], 1) out = self.fc(lstm_out) out = self.sigmoid(out) return out class LSTMModel(BaseModel): """LSTM情感分析模型""" def __init__(self): super().__init__("LSTM") self.word2vec_model = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def _train_word2vec(self, train_data: List[Tuple[str, int]], **kwargs): """训练Word2Vec词向量""" print("训练Word2Vec词向量...") # 准备Word2Vec输入数据 wv_input = [text.split(" ") for text, _ in train_data] vector_size = kwargs.get('vector_size', 64) min_count = kwargs.get('min_count', 1) epochs = kwargs.get('epochs', 1000) # 训练Word2Vec self.word2vec_model = models.Word2Vec( wv_input, vector_size=vector_size, min_count=min_count, epochs=epochs ) print(f"Word2Vec训练完成,词向量维度: {vector_size}") def train(self, train_data: List[Tuple[str, int]], **kwargs) -> None: """训练LSTM模型""" print(f"开始训练 {self.model_name} 模型...") # 训练Word2Vec self._train_word2vec(train_data, **kwargs) # 超参数 learning_rate = kwargs.get('learning_rate', 5e-4) num_epochs = kwargs.get('num_epochs', 5) batch_size = kwargs.get('batch_size', 100) embed_size = kwargs.get('embed_size', 64) hidden_size = kwargs.get('hidden_size', 64) num_layers = kwargs.get('num_layers', 2) print(f"LSTM超参数: lr={learning_rate}, epochs={num_epochs}, " f"batch_size={batch_size}, hidden_size={hidden_size}") # 创建数据集 train_dataset = LSTMDataset(train_data, self.word2vec_model) train_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True) # 创建模型 self.model = LSTMNet(embed_size, hidden_size, num_layers).to(self.device) # 损失函数和优化器 criterion = nn.BCELoss() optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate) # 训练循环 self.model.train() for epoch in range(num_epochs): total_loss = 0 num_batches = 0 for i, (x, labels, lengths) in enumerate(train_loader): x = x.to(self.device) labels = labels.to(self.device) # 前向传播 outputs = self.model(x, lengths) logits = outputs.view(-1) loss = criterion(logits, labels) # 反向传播 optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() num_batches += 1 if (i + 1) % 10 == 0: avg_loss = total_loss / num_batches print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}], Loss: {avg_loss:.4f}") # 保存每个epoch的模型 if kwargs.get('save_each_epoch', False): epoch_model_path = f"./model/lstm_epoch_{epoch+1}.pth" os.makedirs(os.path.dirname(epoch_model_path), exist_ok=True) torch.save(self.model.state_dict(), epoch_model_path) print(f"已保存模型: {epoch_model_path}") self.is_trained = True print(f"{self.model_name} 模型训练完成!") def predict(self, texts: List[str]) -> List[int]: """预测文本情感""" if not self.is_trained: raise ValueError(f"模型 {self.model_name} 尚未训练,请先调用train方法") # 创建数据集 test_data = [(text, 0) for text in texts] # 标签无关紧要 test_dataset = LSTMDataset(test_data, self.word2vec_model) test_loader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn) predictions = [] self.model.eval() with torch.no_grad(): for x, _, lengths in test_loader: x = x.to(self.device) outputs = self.model(x, lengths) outputs = outputs.view(-1) # 转换为类别标签 preds = (outputs > 0.5).cpu().numpy() predictions.extend(preds.astype(int).tolist()) return predictions def predict_single(self, text: str) -> Tuple[int, float]: """预测单条文本的情感""" if not self.is_trained: raise ValueError(f"模型 {self.model_name} 尚未训练,请先调用train方法") # 转换为词向量 vectors = [] for word in text.split(" "): if word in self.word2vec_model.wv.key_to_index: vectors.append(self.word2vec_model.wv[word]) if len(vectors) == 0: return 0, 0.5 # 如果没有有效词向量,返回默认值 # 转换为tensor x = torch.Tensor(vectors).unsqueeze(0).to(self.device) # 添加batch维度 lengths = [len(vectors)] self.model.eval() with torch.no_grad(): output = self.model(x, lengths) prob = output.item() prediction = int(prob > 0.5) confidence = prob if prediction == 1 else 1 - prob return prediction, confidence def save_model(self, model_path: str = None) -> None: """保存模型""" if not self.is_trained: raise ValueError(f"模型 {self.model_name} 尚未训练,无法保存") if model_path is None: model_path = f"./model/{self.model_name.lower()}_model.pth" os.makedirs(os.path.dirname(model_path), exist_ok=True) # 保存模型状态和Word2Vec model_data = { 'model_state_dict': self.model.state_dict(), 'word2vec_model': self.word2vec_model, 'model_config': { 'embed_size': 64, 'hidden_size': 64, 'num_layers': 2 }, 'device': str(self.device) } torch.save(model_data, model_path) print(f"模型已保存到: {model_path}") def load_model(self, model_path: str) -> None: """加载模型""" if not os.path.exists(model_path): raise FileNotFoundError(f"模型文件不存在: {model_path}") model_data = torch.load(model_path, map_location=self.device) # 加载Word2Vec self.word2vec_model = model_data['word2vec_model'] # 重建LSTM网络 config = model_data['model_config'] self.model = LSTMNet( config['embed_size'], config['hidden_size'], config['num_layers'] ).to(self.device) # 加载模型权重 self.model.load_state_dict(model_data['model_state_dict']) self.is_trained = True print(f"已加载模型: {model_path}") def main(): """主函数""" parser = argparse.ArgumentParser(description='LSTM情感分析模型训练') parser.add_argument('--train_path', type=str, default='./data/weibo2018/train.txt', help='训练数据路径') parser.add_argument('--test_path', type=str, default='./data/weibo2018/test.txt', help='测试数据路径') parser.add_argument('--model_path', type=str, default='./model/lstm_model.pth', help='模型保存路径') parser.add_argument('--epochs', type=int, default=5, help='训练轮数') parser.add_argument('--batch_size', type=int, default=100, help='批大小') parser.add_argument('--hidden_size', type=int, default=64, help='LSTM隐藏层大小') parser.add_argument('--learning_rate', type=float, default=5e-4, help='学习率') parser.add_argument('--eval_only', action='store_true', help='仅评估已有模型,不进行训练') args = parser.parse_args() # 创建模型 model = LSTMModel() if args.eval_only: # 仅评估模式 print("评估模式:加载已有模型进行评估") model.load_model(args.model_path) # 加载测试数据 _, test_data = BaseModel.load_data(args.train_path, args.test_path) # 评估模型 model.evaluate(test_data) else: # 训练模式 # 加载数据 train_data, test_data = BaseModel.load_data(args.train_path, args.test_path) # 训练模型 model.train( train_data, num_epochs=args.epochs, batch_size=args.batch_size, hidden_size=args.hidden_size, learning_rate=args.learning_rate ) # 评估模型 model.evaluate(test_data) # 保存模型 model.save_model(args.model_path) # 示例预测 print("\n示例预测:") test_texts = [ "今天天气真好,心情很棒", "这部电影太无聊了,浪费时间", "哈哈哈,太有趣了" ] for text in test_texts: pred, conf = model.predict_single(text) sentiment = "正面" if pred == 1 else "负面" print(f"文本: {text}") print(f"预测: {sentiment} (置信度: {conf:.4f})") print() if __name__ == "__main__": main()