Enhance LSTM model for small datasets and improve performance.
This commit is contained in:
+309
-112
@@ -4,12 +4,19 @@ import torch.optim as optim
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from sklearn.model_selection import train_test_split
|
||||
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
|
||||
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
|
||||
from sklearn.feature_extraction.text import TfidfVectorizer
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
import jieba
|
||||
from transformers import BertTokenizer
|
||||
from transformers import BertTokenizer, BertModel
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
from torch.optim.lr_scheduler import ReduceLROnPlateau
|
||||
from gensim.models import KeyedVectors
|
||||
import json
|
||||
import torch.nn.functional as F
|
||||
|
||||
# 配置日志记录
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
@@ -49,17 +56,90 @@ class TextDataset(Dataset):
|
||||
'label': torch.tensor(label, dtype=torch.long)
|
||||
}
|
||||
|
||||
class AttentionLayer(nn.Module):
|
||||
"""注意力层"""
|
||||
def __init__(self, hidden_dim):
|
||||
super().__init__()
|
||||
self.attention = nn.Linear(hidden_dim, 1)
|
||||
|
||||
def forward(self, lstm_output):
|
||||
attention_weights = torch.softmax(self.attention(lstm_output), dim=1)
|
||||
context_vector = torch.sum(attention_weights * lstm_output, dim=1)
|
||||
return context_vector, attention_weights
|
||||
|
||||
# 添加数据增强类
|
||||
class TextAugmenter:
|
||||
def __init__(self, language='zh', synonyms_file=None):
|
||||
self.language = language
|
||||
self.synonyms_dict = self._load_synonyms(synonyms_file)
|
||||
|
||||
def _load_synonyms(self, file_path):
|
||||
base_dict = {
|
||||
"很好": ["非常好", "太好了", "特别好", "相当好", "真不错"],
|
||||
"糟糕": ["差劲", "很差", "不好", "太差", "糟透了"],
|
||||
"一般": ["还行", "凑合", "普通", "马马虎虎", "中等"],
|
||||
"满意": ["很满意", "挺好", "不错", "称心如意"],
|
||||
"生气": ["愤怒", "恼火", "不爽", "气愤"],
|
||||
"失望": ["伤心", "难过", "不满意", "遗憾"],
|
||||
# 添加更多情感词汇对
|
||||
}
|
||||
|
||||
if file_path and os.path.exists(file_path):
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
custom_dict = json.load(f)
|
||||
base_dict.update(custom_dict)
|
||||
except Exception as e:
|
||||
logger.warning(f"加载同义词典失败: {e}")
|
||||
|
||||
return base_dict
|
||||
|
||||
def synonym_replacement(self, text, n=1):
|
||||
words = list(jieba.cut(text))
|
||||
new_words = words.copy()
|
||||
num_replaced = 0
|
||||
|
||||
for word in list(set(words)):
|
||||
if len(word) > 1 and num_replaced < n:
|
||||
synonyms = self._get_synonyms(word)
|
||||
if synonyms:
|
||||
synonym = random.choice(synonyms)
|
||||
new_words = [synonym if w == word else w for w in new_words]
|
||||
num_replaced += 1
|
||||
|
||||
return ''.join(new_words)
|
||||
|
||||
def _get_synonyms(self, word):
|
||||
return self.synonyms_dict.get(word, [])
|
||||
|
||||
def augment(self, texts, labels, augment_ratio=0.5):
|
||||
augmented_texts = []
|
||||
augmented_labels = []
|
||||
|
||||
for text, label in zip(texts, labels):
|
||||
augmented_texts.append(text)
|
||||
augmented_labels.append(label)
|
||||
|
||||
if random.random() < augment_ratio:
|
||||
aug_text = self.synonym_replacement(text)
|
||||
augmented_texts.append(aug_text)
|
||||
augmented_labels.append(label)
|
||||
|
||||
return np.array(augmented_texts), np.array(augmented_labels)
|
||||
|
||||
class LSTMSentimentModel(nn.Module):
|
||||
"""基于LSTM的情感分析模型"""
|
||||
|
||||
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=2,
|
||||
bidirectional=True, dropout=0.5, pad_idx=0):
|
||||
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=1,
|
||||
bidirectional=True, dropout=0.3, pad_idx=0, pretrained_embeddings=None):
|
||||
super().__init__()
|
||||
|
||||
# 嵌入层
|
||||
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
|
||||
if pretrained_embeddings is not None:
|
||||
self.embedding.weight.data.copy_(pretrained_embeddings)
|
||||
self.embedding.weight.requires_grad = True
|
||||
|
||||
# LSTM层
|
||||
self.lstm = nn.LSTM(
|
||||
embedding_dim,
|
||||
hidden_dim,
|
||||
@@ -69,11 +149,17 @@ class LSTMSentimentModel(nn.Module):
|
||||
batch_first=True
|
||||
)
|
||||
|
||||
# 全连接层,如果是双向LSTM,输入维度需要翻倍
|
||||
self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
|
||||
# 注意力层
|
||||
self.attention = AttentionLayer(hidden_dim * 2 if bidirectional else hidden_dim)
|
||||
|
||||
# 全连接层
|
||||
fc_dim = hidden_dim * 2 if bidirectional else hidden_dim
|
||||
self.fc1 = nn.Linear(fc_dim, fc_dim // 2)
|
||||
self.fc2 = nn.Linear(fc_dim // 2, output_dim)
|
||||
|
||||
# Dropout层
|
||||
self.dropout = nn.Dropout(dropout)
|
||||
self.bn = nn.BatchNorm1d(fc_dim // 2)
|
||||
self.relu = nn.ReLU()
|
||||
|
||||
def forward(self, text, attention_mask=None):
|
||||
# 文本通过嵌入层 [batch_size, seq_len] -> [batch_size, seq_len, embedding_dim]
|
||||
@@ -95,27 +181,49 @@ class LSTMSentimentModel(nn.Module):
|
||||
else:
|
||||
output, (hidden, cell) = self.lstm(embedded)
|
||||
|
||||
# 如果是双向LSTM,需要拼接最后一层的前向和后向隐藏状态
|
||||
if self.lstm.bidirectional:
|
||||
hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)
|
||||
# 应用注意力机制
|
||||
context_vector, attention_weights = self.attention(output)
|
||||
|
||||
# 应用dropout和全连接层
|
||||
x = self.dropout(context_vector)
|
||||
x = self.fc1(x)
|
||||
x = self.bn(x)
|
||||
x = self.relu(x)
|
||||
x = self.dropout(x)
|
||||
x = self.fc2(x)
|
||||
|
||||
return x, attention_weights
|
||||
|
||||
# 添加早停类
|
||||
class EarlyStopping:
|
||||
def __init__(self, patience=5, min_delta=0):
|
||||
self.patience = patience
|
||||
self.min_delta = min_delta
|
||||
self.counter = 0
|
||||
self.best_loss = None
|
||||
self.early_stop = False
|
||||
|
||||
def __call__(self, val_loss):
|
||||
if self.best_loss is None:
|
||||
self.best_loss = val_loss
|
||||
elif val_loss > self.best_loss - self.min_delta:
|
||||
self.counter += 1
|
||||
if self.counter >= self.patience:
|
||||
self.early_stop = True
|
||||
else:
|
||||
hidden = hidden[-1]
|
||||
|
||||
# 应用dropout
|
||||
hidden = self.dropout(hidden)
|
||||
|
||||
# 全连接层
|
||||
return self.fc(hidden)
|
||||
self.best_loss = val_loss
|
||||
self.counter = 0
|
||||
|
||||
class LSTMModelManager:
|
||||
"""LSTM模型管理类,用于训练、评估和预测"""
|
||||
|
||||
def __init__(self, bert_model_path, model_save_path=None, vocab_size=30522,
|
||||
embedding_dim=128, hidden_dim=256, output_dim=2, n_layers=2,
|
||||
bidirectional=True, dropout=0.5):
|
||||
embedding_dim=100, hidden_dim=64, output_dim=2, n_layers=1,
|
||||
bidirectional=True, dropout=0.3, word2vec_path=None):
|
||||
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||||
self.tokenizer = BertTokenizer.from_pretrained(bert_model_path)
|
||||
self.vocab_size = vocab_size
|
||||
self.embedding_dim = embedding_dim
|
||||
self.model = LSTMSentimentModel(
|
||||
vocab_size=vocab_size,
|
||||
embedding_dim=embedding_dim,
|
||||
@@ -131,113 +239,202 @@ class LSTMModelManager:
|
||||
if model_save_path and os.path.exists(model_save_path):
|
||||
self.model.load_state_dict(torch.load(model_save_path, map_location=self.device))
|
||||
logger.info(f"已从 {model_save_path} 加载模型")
|
||||
|
||||
self.augmenter = TextAugmenter()
|
||||
self.early_stopping = EarlyStopping(patience=5)
|
||||
|
||||
# 加载预训练词向量
|
||||
self.pretrained_embeddings = None
|
||||
if word2vec_path and os.path.exists(word2vec_path):
|
||||
try:
|
||||
word_vectors = KeyedVectors.load_word2vec_format(word2vec_path, binary=True)
|
||||
self.pretrained_embeddings = self._build_embedding_matrix(word_vectors)
|
||||
logger.info("成功加载预训练词向量")
|
||||
except Exception as e:
|
||||
logger.warning(f"加载预训练词向量失败: {e}")
|
||||
|
||||
# 初始化对抗训练参数
|
||||
self.epsilon = 0.01
|
||||
self.alpha = 0.001
|
||||
|
||||
def _build_embedding_matrix(self, word_vectors):
|
||||
embedding_matrix = torch.zeros(self.vocab_size, self.embedding_dim)
|
||||
for i in range(self.vocab_size):
|
||||
try:
|
||||
word = self.tokenizer.convert_ids_to_tokens(i)
|
||||
if word in word_vectors:
|
||||
embedding_matrix[i] = torch.tensor(word_vectors[word])
|
||||
except:
|
||||
continue
|
||||
return embedding_matrix
|
||||
|
||||
def adversarial_training(self, batch, criterion):
|
||||
"""对抗训练步骤"""
|
||||
# 计算原始损失
|
||||
input_ids = batch['input_ids'].to(self.device)
|
||||
attention_mask = batch['attention_mask'].to(self.device)
|
||||
labels = batch['label'].to(self.device)
|
||||
|
||||
outputs, _ = self.model(input_ids, attention_mask)
|
||||
loss = criterion(outputs, labels)
|
||||
|
||||
# 计算梯度
|
||||
loss.backward(retain_graph=True)
|
||||
|
||||
# 获取嵌入层的梯度
|
||||
grad_embed = self.model.embedding.weight.grad.data
|
||||
|
||||
# 生成对抗扰动
|
||||
perturb = self.epsilon * torch.sign(grad_embed)
|
||||
|
||||
# 应用扰动
|
||||
self.model.embedding.weight.data.add_(perturb)
|
||||
|
||||
# 计算对抗损失
|
||||
outputs_adv, _ = self.model(input_ids, attention_mask)
|
||||
loss_adv = criterion(outputs_adv, labels)
|
||||
|
||||
# 恢复原始嵌入
|
||||
self.model.embedding.weight.data.sub_(perturb)
|
||||
|
||||
return loss + self.alpha * loss_adv
|
||||
|
||||
def train_logistic_regression(self, train_texts, train_labels, val_texts=None, val_labels=None):
|
||||
vectorizer = TfidfVectorizer(max_features=5000)
|
||||
X_train = vectorizer.fit_transform(train_texts)
|
||||
|
||||
if val_texts is None:
|
||||
X_train, X_val, y_train, y_val = train_test_split(
|
||||
X_train, train_labels, test_size=0.2, stratify=train_labels
|
||||
)
|
||||
else:
|
||||
X_val = vectorizer.transform(val_texts)
|
||||
y_train, y_val = train_labels, val_labels
|
||||
|
||||
lr_model = LogisticRegression(class_weight='balanced')
|
||||
lr_model.fit(X_train, y_train)
|
||||
|
||||
val_pred = lr_model.predict(X_val)
|
||||
lr_accuracy = accuracy_score(y_val, val_pred)
|
||||
lr_f1 = f1_score(y_val, val_pred, average='macro')
|
||||
|
||||
return lr_accuracy, lr_f1
|
||||
|
||||
def train(self, train_texts, train_labels, val_texts=None, val_labels=None,
|
||||
batch_size=32, learning_rate=2e-5, epochs=10, validation_split=0.2):
|
||||
batch_size=16, epochs=10, learning_rate=2e-4):
|
||||
"""训练模型"""
|
||||
logger.info("开始训练模型...")
|
||||
|
||||
# 如果没有提供验证集,从训练集中划分
|
||||
if val_texts is None or val_labels is None:
|
||||
train_texts, val_texts, train_labels, val_labels = train_test_split(
|
||||
train_texts, train_labels, test_size=validation_split, random_state=42
|
||||
)
|
||||
# 首先训练逻辑回归作为基线
|
||||
lr_accuracy, lr_f1 = self.train_logistic_regression(train_texts, train_labels, val_texts, val_labels)
|
||||
logger.info(f"逻辑回归基线模型 - 准确率: {lr_accuracy:.4f}, F1: {lr_f1:.4f}")
|
||||
|
||||
# 创建数据集和数据加载器
|
||||
train_dataset = TextDataset(train_texts, train_labels, self.tokenizer)
|
||||
val_dataset = TextDataset(val_texts, val_labels, self.tokenizer)
|
||||
# 如果数据量小于1000,进行数据增强
|
||||
if len(train_texts) < 1000:
|
||||
train_texts, train_labels = self.augmenter.augment(train_texts, train_labels)
|
||||
logger.info(f"数据增强后的训练集大小: {len(train_texts)}")
|
||||
|
||||
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
|
||||
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)
|
||||
# 创建K折交叉验证
|
||||
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
|
||||
fold_results = []
|
||||
|
||||
# 优化器和损失函数
|
||||
optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
|
||||
criterion = nn.CrossEntropyLoss()
|
||||
|
||||
# 训练循环
|
||||
best_val_loss = float('inf')
|
||||
for epoch in range(epochs):
|
||||
# 训练模式
|
||||
self.model.train()
|
||||
train_loss = 0
|
||||
train_preds = []
|
||||
train_labels_list = []
|
||||
for fold, (train_idx, val_idx) in enumerate(kf.split(train_texts, train_labels)):
|
||||
logger.info(f"训练第 {fold+1} 折...")
|
||||
|
||||
for batch in train_dataloader:
|
||||
# 获取数据
|
||||
# 重置模型
|
||||
self.model = self._create_model()
|
||||
optimizer = optim.AdamW(self.model.parameters(), lr=learning_rate)
|
||||
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=2)
|
||||
criterion = nn.CrossEntropyLoss(weight=torch.tensor([1.0, 1.0]).to(self.device))
|
||||
|
||||
# 准备数据
|
||||
X_train, X_val = train_texts[train_idx], train_texts[val_idx]
|
||||
y_train, y_val = train_labels[train_idx], train_labels[val_idx]
|
||||
|
||||
train_dataset = TextDataset(X_train, y_train, self.tokenizer)
|
||||
val_dataset = TextDataset(X_val, y_val, self.tokenizer)
|
||||
|
||||
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
|
||||
val_loader = DataLoader(val_dataset, batch_size=batch_size)
|
||||
|
||||
best_val_loss = float('inf')
|
||||
for epoch in range(epochs):
|
||||
# 训练和验证逻辑
|
||||
train_loss = self._train_epoch(train_loader, optimizer, criterion)
|
||||
val_loss, val_acc, val_f1 = self._validate(val_loader, criterion)
|
||||
|
||||
scheduler.step(val_loss)
|
||||
|
||||
if val_loss < best_val_loss:
|
||||
best_val_loss = val_loss
|
||||
if self.model_save_path:
|
||||
torch.save(self.model.state_dict(),
|
||||
f"{self.model_save_path}_fold{fold}.pt")
|
||||
|
||||
if self.early_stopping(val_loss):
|
||||
break
|
||||
|
||||
fold_results.append({
|
||||
'val_loss': val_loss,
|
||||
'val_accuracy': val_acc,
|
||||
'val_f1': val_f1
|
||||
})
|
||||
|
||||
# 计算平均结果
|
||||
avg_val_loss = np.mean([res['val_loss'] for res in fold_results])
|
||||
avg_val_acc = np.mean([res['val_accuracy'] for res in fold_results])
|
||||
avg_val_f1 = np.mean([res['val_f1'] for res in fold_results])
|
||||
|
||||
logger.info(f"交叉验证平均结果 - 损失: {avg_val_loss:.4f}, 准确率: {avg_val_acc:.4f}, F1: {avg_val_f1:.4f}")
|
||||
|
||||
# 如果LSTM模型效果比逻辑回归差,给出警告
|
||||
if avg_val_acc < lr_accuracy:
|
||||
logger.warning("LSTM模型性能低于逻辑回归基线,建议使用逻辑回归模型")
|
||||
|
||||
return avg_val_loss, avg_val_acc, avg_val_f1
|
||||
|
||||
def _train_epoch(self, train_loader, optimizer, criterion):
|
||||
self.model.train()
|
||||
total_loss = 0
|
||||
for batch in train_loader:
|
||||
optimizer.zero_grad()
|
||||
|
||||
# 使用对抗训练
|
||||
loss = self.adversarial_training(batch, criterion)
|
||||
|
||||
loss.backward()
|
||||
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
|
||||
optimizer.step()
|
||||
|
||||
total_loss += loss.item()
|
||||
|
||||
return total_loss / len(train_loader)
|
||||
|
||||
def _validate(self, val_loader, criterion):
|
||||
self.model.eval()
|
||||
total_loss = 0
|
||||
val_preds = []
|
||||
val_labels_list = []
|
||||
|
||||
with torch.no_grad():
|
||||
for batch in val_loader:
|
||||
input_ids = batch['input_ids'].to(self.device)
|
||||
attention_mask = batch['attention_mask'].to(self.device)
|
||||
labels = batch['label'].to(self.device)
|
||||
|
||||
# 前向传播
|
||||
optimizer.zero_grad()
|
||||
outputs = self.model(input_ids, attention_mask)
|
||||
|
||||
# 计算损失
|
||||
outputs, _ = self.model(input_ids, attention_mask)
|
||||
loss = criterion(outputs, labels)
|
||||
train_loss += loss.item()
|
||||
total_loss += loss.item()
|
||||
|
||||
# 反向传播
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
# 收集预测和标签
|
||||
_, predicted = torch.max(outputs, 1)
|
||||
train_preds.extend(predicted.cpu().numpy())
|
||||
train_labels_list.extend(labels.cpu().numpy())
|
||||
|
||||
# 计算训练集的评估指标
|
||||
train_accuracy = accuracy_score(train_labels_list, train_preds)
|
||||
train_f1 = f1_score(train_labels_list, train_preds, average='macro')
|
||||
|
||||
# 验证模式
|
||||
self.model.eval()
|
||||
val_loss = 0
|
||||
val_preds = []
|
||||
val_labels_list = []
|
||||
|
||||
with torch.no_grad():
|
||||
for batch in val_dataloader:
|
||||
input_ids = batch['input_ids'].to(self.device)
|
||||
attention_mask = batch['attention_mask'].to(self.device)
|
||||
labels = batch['label'].to(self.device)
|
||||
|
||||
outputs = self.model(input_ids, attention_mask)
|
||||
loss = criterion(outputs, labels)
|
||||
val_loss += loss.item()
|
||||
|
||||
_, predicted = torch.max(outputs, 1)
|
||||
val_preds.extend(predicted.cpu().numpy())
|
||||
val_labels_list.extend(labels.cpu().numpy())
|
||||
|
||||
# 计算验证集的评估指标
|
||||
val_accuracy = accuracy_score(val_labels_list, val_preds)
|
||||
val_f1 = f1_score(val_labels_list, val_preds, average='macro')
|
||||
|
||||
# 计算平均损失
|
||||
train_loss /= len(train_dataloader)
|
||||
val_loss /= len(val_dataloader)
|
||||
|
||||
logger.info(f'Epoch {epoch+1}/{epochs} | '
|
||||
f'Train Loss: {train_loss:.4f} | '
|
||||
f'Train Acc: {train_accuracy:.4f} | '
|
||||
f'Train F1: {train_f1:.4f} | '
|
||||
f'Val Loss: {val_loss:.4f} | '
|
||||
f'Val Acc: {val_accuracy:.4f} | '
|
||||
f'Val F1: {val_f1:.4f}')
|
||||
|
||||
# 保存最佳模型
|
||||
if val_loss < best_val_loss and self.model_save_path:
|
||||
best_val_loss = val_loss
|
||||
torch.save(self.model.state_dict(), self.model_save_path)
|
||||
logger.info(f"模型已保存到 {self.model_save_path}")
|
||||
val_preds.extend(predicted.cpu().numpy())
|
||||
val_labels_list.extend(labels.cpu().numpy())
|
||||
|
||||
# 如果有保存路径但没有保存过模型,保存最后一轮的模型
|
||||
if self.model_save_path and best_val_loss == float('inf'):
|
||||
torch.save(self.model.state_dict(), self.model_save_path)
|
||||
logger.info(f"最终模型已保存到 {self.model_save_path}")
|
||||
avg_loss = total_loss / len(val_loader)
|
||||
accuracy = accuracy_score(val_labels_list, val_preds)
|
||||
f1 = f1_score(val_labels_list, val_preds, average='macro')
|
||||
|
||||
return train_loss, val_loss, val_accuracy, val_f1
|
||||
return avg_loss, accuracy, f1
|
||||
|
||||
def evaluate(self, test_texts, test_labels, batch_size=32):
|
||||
"""评估模型"""
|
||||
@@ -263,7 +460,7 @@ class LSTMModelManager:
|
||||
attention_mask = batch['attention_mask'].to(self.device)
|
||||
labels = batch['label'].to(self.device)
|
||||
|
||||
outputs = self.model(input_ids, attention_mask)
|
||||
outputs, _ = self.model(input_ids, attention_mask)
|
||||
loss = criterion(outputs, labels)
|
||||
test_loss += loss.item()
|
||||
|
||||
@@ -327,7 +524,7 @@ class LSTMModelManager:
|
||||
input_ids = batch['input_ids'].to(self.device)
|
||||
attention_mask = batch['attention_mask'].to(self.device)
|
||||
|
||||
outputs = self.model(input_ids, attention_mask)
|
||||
outputs, _ = self.model(input_ids, attention_mask)
|
||||
probs = torch.softmax(outputs, dim=1)
|
||||
_, predicted = torch.max(outputs, 1)
|
||||
|
||||
@@ -388,4 +585,4 @@ if __name__ == "__main__":
|
||||
pred, prob = lstm_model_manager.predict(sentence)
|
||||
label = '良好' if pred == 0 else '不良'
|
||||
confidence = prob[pred]
|
||||
print(f"句子: '{sentence}' 预测结果: {label} (置信度: {confidence:.2%})")
|
||||
print(f"句子: '{sentence}' 预测结果: {label} (置信度: {confidence:.2%})")
|
||||
Reference in New Issue
Block a user