64 lines
2.6 KiB
Python
64 lines
2.6 KiB
Python
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import numpy as np
|
|
|
|
class MultiHeadAttentionLayer(nn.Module):
|
|
def __init__(self, embed_size, num_heads, dropout_rate=0.1):
|
|
super(MultiHeadAttentionLayer, self).__init__()
|
|
self.embed_size = embed_size
|
|
self.num_heads = num_heads
|
|
self.head_dim = embed_size // num_heads
|
|
|
|
assert (self.head_dim * num_heads == embed_size), "Embedding size needs to be divisible by num_heads"
|
|
|
|
# 定义线性变换层,分别用于 Q, K, V
|
|
self.q_linear = nn.Linear(embed_size, embed_size)
|
|
self.k_linear = nn.Linear(embed_size, embed_size)
|
|
self.v_linear = nn.Linear(embed_size, embed_size)
|
|
|
|
# 最终的线性层
|
|
self.fc_out = nn.Linear(embed_size, embed_size)
|
|
|
|
# 增加 Dropout 和 LayerNorm
|
|
self.dropout = nn.Dropout(p=dropout_rate)
|
|
self.layer_norm = nn.LayerNorm(embed_size)
|
|
|
|
def forward(self, values, keys, query, mask=None):
|
|
N = query.shape[0] # batch_size
|
|
|
|
# 将输入变换为 Q, K, V
|
|
Q = self.q_linear(query) # shape: (N, seq_len, embed_size)
|
|
K = self.k_linear(keys) # shape: (N, seq_len, embed_size)
|
|
V = self.v_linear(values) # shape: (N, seq_len, embed_size)
|
|
|
|
# 将 Q, K, V 分成多个头
|
|
Q = Q.reshape(N, -1, self.num_heads, self.head_dim) # shape: (N, seq_len, num_heads, head_dim)
|
|
K = K.reshape(N, -1, self.num_heads, self.head_dim) # shape: (N, seq_len, num_heads, head_dim)
|
|
V = V.reshape(N, -1, self.num_heads, self.head_dim) # shape: (N, seq_len, num_heads, head_dim)
|
|
|
|
# 计算缩放点积注意力
|
|
attention_scores = torch.einsum("nqhd,nkhd->nhqk", [Q, K]) # (N, num_heads, seq_len_q, seq_len_k)
|
|
attention_scores = attention_scores / (self.head_dim ** (1 / 2)) # 缩放
|
|
|
|
if mask is not None:
|
|
attention_scores = attention_scores.masked_fill(mask == 0, float("-1e20"))
|
|
|
|
attention = torch.softmax(attention_scores, dim=-1) # 归一化
|
|
|
|
# 根据注意力分布加权 V
|
|
out = torch.einsum("nhql,nlhd->nqhd", [attention, V]) # (N, num_heads, seq_len_q, head_dim)
|
|
out = out.reshape(N, -1, self.embed_size) # 将多头输出拼接回原始嵌入大小
|
|
|
|
# 通过线性层
|
|
out = self.fc_out(out)
|
|
|
|
# 使用残差连接并应用 LayerNorm
|
|
out = self.layer_norm(out + query)
|
|
|
|
# 应用 Dropout
|
|
out = self.dropout(out)
|
|
|
|
return out
|
|
|