Scikit learn load_diabetes数据集为例的self-attention实战
首先直接贴R2结果:5轮训练就达到 测试集 R²: 0.5959
train、val、test集比例:0.75:0.15:0.1
其实糖尿病这个数据集的数据量太小了,只有四百来个样本,正常的model很难train起来,尤其是attention模型,那为什么还要用attention的model来做呢?主要是学习模型的写法,并且数据集小跑起来快,很适合做基础的算法验证。
双层attention在这个数据集上能做到0.5959其实蛮难的,大于绝大多数的model了。具体代码如下
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np
# ====================== 修复版:自注意力 + 设备支持 ======================
class self_attention(nn.Module):
def __init__(self, input_dim, dim_k, dim_v):
super().__init__()
self.Wq = nn.Linear(input_dim, dim_k)
self.Wk = nn.Linear(input_dim, dim_k)
self.Wv = nn.Linear(input_dim, dim_v)
# 缩放因子固定,避免每次forward新建张量
self.scale = torch.sqrt(torch.tensor(dim_k, dtype=torch.float32))
def forward(self, X):
Q = self.Wq(X)
K = self.Wk(X)
V = self.Wv(X)
# 修复:用 @ 替代 torch.mm,兼容所有场景
attn_score = Q @ K.transpose(-2, -1)#等价于-1,-2等价于K.T
attn_score = attn_score / self.scale.to(X.device) # 设备对齐
attn_weight = torch.softmax(attn_score, dim=-1)
out = attn_weight @ V
return out, attn_weight
# 设置随机种子
def same_seed(seed):
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
# ====================== 优化版模型:残差连接 + 删除冗余层 ======================
class Model(nn.Module):
def __init__(self, input_dim, dim_k, dim_v, dropout=0.12):
super().__init__()
# 第一层注意力
self.attn1 = self_attention(input_dim, dim_k, dim_v)
self.drop1 = nn.Dropout(dropout)
# 残差映射:适配维度
self.res1 = nn.Linear(input_dim, dim_v)
# 第二层注意力
self.attn2 = self_attention(dim_v, dim_k, dim_v)
self.drop2 = nn.Dropout(dropout)
# 最终输出
self.fc2 = nn.Linear(dim_v, 1)
def forward(self, x):
if self.training:
x = x + torch.randn_like(x) * 0.01 # 超小噪声,安全有效
# 第一层 + 残差
attn1_out, _ = self.attn1(x)
attn1_out = self.drop1(attn1_out)
res_out1 = attn1_out + self.res1(x) # 残差连接
# 第二层
attn2_out, _ = self.attn2(res_out1)
attn2_out = self.drop2(attn2_out)
return self.fc2(attn2_out)
# ====================== 训练代码(加了DEVICE) ======================
if __name__ == "__main__":
# 自动识别设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("使用设备:", device)
split_seed = 14
init_seed = 42
same_seed(split_seed)
data = load_diabetes()
X = torch.tensor(data.data, dtype=torch.float32).to(device)
y = torch.tensor(data.target, dtype=torch.float32).reshape(-1, 1).to(device)
n = X.shape[0]
train_n = int(0.75 * n)
val_n = int(0.15 * n)
indices = torch.randperm(n)
train_idx = indices[:train_n]
val_idx = indices[train_n: train_n + val_n]
test_idx = indices[train_n + val_n:]
X_train, y_train = X[train_idx], y[train_idx]
X_val, y_val = X[val_idx], y[val_idx]
X_test, y_test = X[test_idx], y[test_idx]
# 标准化
x_mean = X_train.mean(dim=0)
x_std = X_train.std(dim=0).clamp_min(1e-6)
X_train = (X_train - x_mean) / x_std
X_val = (X_val - x_mean) / x_std
X_test = (X_test - x_mean) / x_std
y_mean = y_train.mean()
y_std = y_train.std().clamp_min(1e-6)
y_train_n = (y_train - y_mean) / y_std
y_val_n = (y_val - y_mean) / y_std
# 超参数
input_dim = 10
dim_k = 32
dim_v = 32
dropout = 0.12
batch_size = 48
n_epoch = 2000
lr = 2e-3
weight_decay = 5e-4
betas = (0.9, 0.999)
same_seed(init_seed)
model = Model(input_dim, dim_k, dim_v, dropout=dropout).to(device) # 模型迁移到设备
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay, betas=betas)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=25, min_lr=1e-6)
from torch.utils.data import DataLoader, TensorDataset
train_loader = DataLoader(TensorDataset(X_train, y_train_n), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val_n), batch_size=batch_size, shuffle=False)
best_path = "best_model.pth"
best_val_loss = float("inf")
early_stop_patience = 90
epochs_no_improve = 0
print("开始训练(优化版模型)...\n")
for epoch in range(n_epoch):
model.train()
train_loss_sum = 0.0
n_train = 0
for xb, yb in train_loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
train_loss_sum += loss.item() * xb.size(0)
n_train += xb.size(0)
train_loss = train_loss_sum / n_train
model.eval()
val_loss_sum = 0.0
n_val = 0
with torch.no_grad():
for xb, yb in val_loader:
vl = criterion(model(xb), yb)
val_loss_sum += vl.item() * xb.size(0)
n_val += xb.size(0)
val_loss = val_loss_sum / n_val
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), best_path)
epochs_no_improve = 0
print(f"✅ Epoch {epoch + 1:3d} | 最佳模型已保存 | Val Loss: {val_loss:.4f}")
else:
epochs_no_improve += 1
if epoch % 20 == 0:
current_lr = optimizer.param_groups[0]["lr"]
print(f"Epoch {epoch + 1:3d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | lr: {current_lr:.2e}")
if epochs_no_improve >= early_stop_patience:
print(f"\n早停:连续 {early_stop_patience} 轮无提升")
break
# 测试
model.load_state_dict(torch.load(best_path, map_location=device))
model.eval()
with torch.no_grad():
pred_test_n = model(X_test)
pred_test = pred_test_n * y_std + y_mean
test_r2 = r2_score(y_test.cpu().numpy(), pred_test.cpu().numpy())
print(f"\n🎯 测试集 R²: {test_r2:.4f}")
运行结果如下:
使用设备: cuda
开始训练(优化版模型)...
✅ Epoch 1 | 最佳模型已保存 | Val Loss: 0.6903
Epoch 1 | Train Loss: 0.9983 | Val Loss: 0.6903 | lr: 2.00e-03
✅ Epoch 2 | 最佳模型已保存 | Val Loss: 0.6078
✅ Epoch 3 | 最佳模型已保存 | Val Loss: 0.5738
✅ Epoch 4 | 最佳模型已保存 | Val Loss: 0.4661
✅ Epoch 5 | 最佳模型已保存 | Val Loss: 0.4623
Epoch 21 | Train Loss: 0.4881 | Val Loss: 0.5031 | lr: 2.00e-03
Epoch 41 | Train Loss: 0.4667 | Val Loss: 0.5080 | lr: 1.00e-03
Epoch 61 | Train Loss: 0.4756 | Val Loss: 0.4944 | lr: 5.00e-04
Epoch 81 | Train Loss: 0.4636 | Val Loss: 0.4944 | lr: 5.00e-04
早停:连续 90 轮无提升
🎯 测试集 R²: 0.5959
当然这个代码也有一个问题,即attention的维度是[b,b],从语义分析来说是没有什么价值的。所以我也做了一版feature的attention
但是这个样本量实在太小,feature的反而train不太起来,最后效果其实没有上面的好
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np
# ====================== 修复版:自注意力 + 设备支持 ======================
class self_attention(nn.Module):
def __init__(self, input_dim, dim_k, dim_v):
super().__init__()
self.Wq = nn.Linear(input_dim, dim_k)
self.Wk = nn.Linear(input_dim, dim_k)
self.Wv = nn.Linear(input_dim, dim_v)
# 缩放因子固定,避免每次forward新建张量
self.scale = torch.sqrt(torch.tensor(dim_k, dtype=torch.float32))
def forward(self, X):
Q = self.Wq(X)
K = self.Wk(X)
V = self.Wv(X)
# 修复:用 @ 替代 torch.mm,兼容所有场景
attn_score = Q @ K.transpose(-2, -1)#等价于-1,-2等价于K.T
attn_score = attn_score / self.scale.to(X.device) # 设备对齐
attn_weight = torch.softmax(attn_score, dim=-1)
out = attn_weight @ V
return out, attn_weight
# 设置随机种子
def same_seed(seed):
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
# ====================== 优化版模型:残差连接 + 删除冗余层 ======================
class Model(nn.Module):
def __init__(self, input_dim, dim_k, dim_v, dropout=0.12):
super().__init__()
# 第一层注意力
self.attn1 = self_attention(1, dim_k, dim_v)
self.drop1 = nn.Dropout(dropout)
# 残差映射:适配维度
self.res1 = nn.Linear(1, dim_v)
# 第二层注意力
self.attn2 = self_attention(dim_v, dim_k, dim_v)
self.drop2 = nn.Dropout(dropout)
# 最终输出
self.fc2 = nn.Linear(dim_v, 1)
def forward(self, x):
if self.training:
x = x + torch.randn_like(x) * 0.01 # 超小噪声,安全有效
# 🔥 关键1:把 (B, 10) → (B, 10, 1)
x = x.unsqueeze(-1)
# 第一层 + 残差
attn1_out, _ = self.attn1(x)
attn1_out = self.drop1(attn1_out)
res_out1 = attn1_out + self.res1(x) # 残差连接
# 第二层
attn2_out, _ = self.attn2(res_out1)
attn2_out = self.drop2(attn2_out)
# 🔥 关键3:把特征聚合成一个向量
out = attn2_out.mean(dim=1) # (B, dim_v)
return self.fc2(out)
# ====================== 训练代码(加了DEVICE) ======================
if __name__ == "__main__":
# 自动识别设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("使用设备:", device)
split_seed = 14
init_seed = 42
same_seed(split_seed)
data = load_diabetes()
X = torch.tensor(data.data, dtype=torch.float32).to(device)
y = torch.tensor(data.target, dtype=torch.float32).reshape(-1, 1).to(device)
n = X.shape[0]
train_n = int(0.75 * n)
val_n = int(0.15 * n)
indices = torch.randperm(n)
train_idx = indices[:train_n]
val_idx = indices[train_n: train_n + val_n]
test_idx = indices[train_n + val_n:]
X_train, y_train = X[train_idx], y[train_idx]
X_val, y_val = X[val_idx], y[val_idx]
X_test, y_test = X[test_idx], y[test_idx]
# 标准化
x_mean = X_train.mean(dim=0)
x_std = X_train.std(dim=0).clamp_min(1e-6)
X_train = (X_train - x_mean) / x_std
X_val = (X_val - x_mean) / x_std
X_test = (X_test - x_mean) / x_std
y_mean = y_train.mean()
y_std = y_train.std().clamp_min(1e-6)
y_train_n = (y_train - y_mean) / y_std
y_val_n = (y_val - y_mean) / y_std
# 超参数
input_dim = 10
dim_k = 32
dim_v = 32
dropout = 0.12
batch_size = 48
n_epoch = 2000
lr = 2e-3
weight_decay = 5e-4
betas = (0.9, 0.999)
same_seed(init_seed)
model = Model(input_dim, dim_k, dim_v, dropout=dropout).to(device) # 模型迁移到设备
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay, betas=betas)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=25, min_lr=1e-6)
from torch.utils.data import DataLoader, TensorDataset
train_loader = DataLoader(TensorDataset(X_train, y_train_n), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val_n), batch_size=batch_size, shuffle=False)
best_path = "best_model.pth"
best_val_loss = float("inf")
early_stop_patience = 90
epochs_no_improve = 0
print("开始训练(优化版模型)...\n")
for epoch in range(n_epoch):
model.train()
train_loss_sum = 0.0
n_train = 0
for xb, yb in train_loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
train_loss_sum += loss.item() * xb.size(0)
n_train += xb.size(0)
train_loss = train_loss_sum / n_train
model.eval()
val_loss_sum = 0.0
n_val = 0
with torch.no_grad():
for xb, yb in val_loader:
vl = criterion(model(xb), yb)
val_loss_sum += vl.item() * xb.size(0)
n_val += xb.size(0)
val_loss = val_loss_sum / n_val
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), best_path)
epochs_no_improve = 0
print(f"✅ Epoch {epoch + 1:3d} | 最佳模型已保存 | Val Loss: {val_loss:.4f}")
else:
epochs_no_improve += 1
if epoch % 20 == 0:
current_lr = optimizer.param_groups[0]["lr"]
print(f"Epoch {epoch + 1:3d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | lr: {current_lr:.2e}")
if epochs_no_improve >= early_stop_patience:
print(f"\n早停:连续 {early_stop_patience} 轮无提升")
break
# 测试
model.load_state_dict(torch.load(best_path, map_location=device))
model.eval()
with torch.no_grad():
pred_test_n = model(X_test)
pred_test = pred_test_n * y_std + y_mean
test_r2 = r2_score(y_test.cpu().numpy(), pred_test.cpu().numpy())
print(f"\n🎯 测试集 R²: {test_r2:.4f}")
运行结果
使用设备: cuda
开始训练(优化版模型)...
✅ Epoch 1 | 最佳模型已保存 | Val Loss: 0.6303
Epoch 1 | Train Loss: 0.9998 | Val Loss: 0.6303 | lr: 2.00e-03
✅ Epoch 18 | 最佳模型已保存 | Val Loss: 0.6297
✅ Epoch 20 | 最佳模型已保存 | Val Loss: 0.6285
✅ Epoch 21 | 最佳模型已保存 | Val Loss: 0.6163
Epoch 21 | Train Loss: 0.7827 | Val Loss: 0.6163 | lr: 2.00e-03
✅ Epoch 23 | 最佳模型已保存 | Val Loss: 0.6160
✅ Epoch 24 | 最佳模型已保存 | Val Loss: 0.6154
✅ Epoch 25 | 最佳模型已保存 | Val Loss: 0.6121
✅ Epoch 26 | 最佳模型已保存 | Val Loss: 0.6019
✅ Epoch 35 | 最佳模型已保存 | Val Loss: 0.6012
✅ Epoch 36 | 最佳模型已保存 | Val Loss: 0.5983
Epoch 41 | Train Loss: 0.7580 | Val Loss: 0.6077 | lr: 2.00e-03
✅ Epoch 48 | 最佳模型已保存 | Val Loss: 0.5901
Epoch 61 | Train Loss: 0.7527 | Val Loss: 0.6047 | lr: 2.00e-03
✅ Epoch 65 | 最佳模型已保存 | Val Loss: 0.5889
Epoch 81 | Train Loss: 0.7534 | Val Loss: 0.6075 | lr: 2.00e-03
Epoch 101 | Train Loss: 0.7505 | Val Loss: 0.6000 | lr: 1.00e-03
✅ Epoch 112 | 最佳模型已保存 | Val Loss: 0.5887
Epoch 121 | Train Loss: 0.7385 | Val Loss: 0.6058 | lr: 1.00e-03
✅ Epoch 129 | 最佳模型已保存 | Val Loss: 0.5856
Epoch 141 | Train Loss: 0.7330 | Val Loss: 0.5921 | lr: 1.00e-03
✅ Epoch 143 | 最佳模型已保存 | Val Loss: 0.5846
✅ Epoch 148 | 最佳模型已保存 | Val Loss: 0.5805
✅ Epoch 156 | 最佳模型已保存 | Val Loss: 0.5797
✅ Epoch 161 | 最佳模型已保存 | Val Loss: 0.5792
Epoch 161 | Train Loss: 0.7337 | Val Loss: 0.5792 | lr: 1.00e-03
✅ Epoch 175 | 最佳模型已保存 | Val Loss: 0.5778
Epoch 181 | Train Loss: 0.7306 | Val Loss: 0.5882 | lr: 1.00e-03
✅ Epoch 183 | 最佳模型已保存 | Val Loss: 0.5769
✅ Epoch 184 | 最佳模型已保存 | Val Loss: 0.5738
✅ Epoch 198 | 最佳模型已保存 | Val Loss: 0.5712
Epoch 201 | Train Loss: 0.7330 | Val Loss: 0.5909 | lr: 1.00e-03
✅ Epoch 211 | 最佳模型已保存 | Val Loss: 0.5711
✅ Epoch 216 | 最佳模型已保存 | Val Loss: 0.5699
✅ Epoch 221 | 最佳模型已保存 | Val Loss: 0.5693
Epoch 221 | Train Loss: 0.7310 | Val Loss: 0.5693 | lr: 1.00e-03
Epoch 241 | Train Loss: 0.7279 | Val Loss: 0.5862 | lr: 1.00e-03
Epoch 261 | Train Loss: 0.7258 | Val Loss: 0.5905 | lr: 5.00e-04
Epoch 281 | Train Loss: 0.7072 | Val Loss: 0.5913 | lr: 2.50e-04
Epoch 301 | Train Loss: 0.7194 | Val Loss: 0.5921 | lr: 1.25e-04
早停:连续 90 轮无提升
🎯 测试集 R²: 0.5495
当然,谈到attention肯定少不了mult-head,具体代码如下
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np
# ====================== 训练代码(加了DEVICE) ======================
if __name__ == "__main__":
# 自动识别设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("使用设备:", device)
split_seed = 14
init_seed = 42
same_seed(split_seed)
data = load_diabetes()
X = torch.tensor(data.data, dtype=torch.float32).to(device)
y = torch.tensor(data.target, dtype=torch.float32).reshape(-1, 1).to(device)
n = X.shape[0]
train_n = int(0.75 * n)
val_n = int(0.15 * n)
indices = torch.randperm(n)
train_idx = indices[:train_n]
val_idx = indices[train_n: train_n + val_n]
test_idx = indices[train_n + val_n:]
X_train, y_train = X[train_idx], y[train_idx]
X_val, y_val = X[val_idx], y[val_idx]
X_test, y_test = X[test_idx], y[test_idx]
# 标准化
x_mean = X_train.mean(dim=0)
x_std = X_train.std(dim=0).clamp_min(1e-6)
X_train = (X_train - x_mean) / x_std
X_val = (X_val - x_mean) / x_std
X_test = (X_test - x_mean) / x_std
y_mean = y_train.mean()
y_std = y_train.std().clamp_min(1e-6)
y_train_n = (y_train - y_mean) / y_std
y_val_n = (y_val - y_mean) / y_std
# 超参数
input_dim = 1
dim_k = 16
dim_v = 16
dropout = 0.12
batch_size = 48
n_epoch = 2000
lr = 2e-3
weight_decay = 5e-4
betas = (0.9, 0.999)
head = 2
same_seed(init_seed)
model = Model(input_dim, dim_k, dim_v,head,output_dim= 1, dropout=dropout).to(device) # 模型迁移到设备
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay, betas=betas)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=25, min_lr=1e-6)
from torch.utils.data import DataLoader, TensorDataset
train_loader = DataLoader(TensorDataset(X_train, y_train_n), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val_n), batch_size=batch_size, shuffle=False)
best_path = "best_model.pth"
best_val_loss = float("inf")
early_stop_patience = 90
epochs_no_improve = 0
print("开始训练(优化版模型)...\n")
for epoch in range(n_epoch):
model.train()
train_loss_sum = 0.0
n_train = 0
for xb, yb in train_loader:
xb = xb.unsqueeze(-1) # ⭐在这里处理
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
train_loss_sum += loss.item() * xb.size(0)
n_train += xb.size(0)
train_loss = train_loss_sum / n_train
model.eval()
val_loss_sum = 0.0
n_val = 0
with torch.no_grad():
for xb, yb in val_loader:
xb = xb.unsqueeze(-1) # ⭐同样处理
vl = criterion(model(xb), yb)
val_loss_sum += vl.item() * xb.size(0)
n_val += xb.size(0)
val_loss = val_loss_sum / n_val
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), best_path)
epochs_no_improve = 0
print(f"✅ Epoch {epoch + 1:3d} | 最佳模型已保存 | Val Loss: {val_loss:.4f}")
else:
epochs_no_improve += 1
if epoch % 20 == 0:
current_lr = optimizer.param_groups[0]["lr"]
print(f"Epoch {epoch + 1:3d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | lr: {current_lr:.2e}")
if epochs_no_improve >= early_stop_patience:
print(f"\n早停:连续 {early_stop_patience} 轮无提升")
break
# 测试
model.load_state_dict(torch.load(best_path, map_location=device))
model.eval()
with torch.no_grad():
X_test = X_test.unsqueeze(-1)# ⭐这里
pred_test_n = model(X_test)
pred_test = pred_test_n * y_std + y_mean
test_r2 = r2_score(y_test.cpu().numpy(), pred_test.cpu().numpy())
print(f"\n🎯 测试集 R²: {test_r2:.4f}")
其中model:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np
import math
# ====================== 修复版:自注意力 + 设备支持 ======================
class MultiHeadSelfAttention(nn.Module):
def __init__(self, input_dim, dim_k, dim_v,num_heads,output_dim = None):
super().__init__()
assert dim_k % num_heads == 0
assert dim_v % num_heads == 0
if output_dim is None:
output_dim = input_dim
self.num_heads = num_heads
self.dk = dim_k // num_heads
self.dv = dim_v // num_heads
self.Wq = nn.Linear(input_dim, dim_k)
self.Wk = nn.Linear(input_dim, dim_k)
self.Wv = nn.Linear(input_dim, dim_v)
# 输出映射(multi-head之后要再融合)
self.fc_out = nn.Linear(dim_v, output_dim)
# self.scale = math.sqrt(self.dk)
# 缩放因子固定,避免每次forward新建张量
self.scale = torch.sqrt(torch.tensor(self.dk, dtype=torch.float32))
def forward(self, X):
B, T, _ = X.shape # batch, seq_len, dim
Q = self.Wq(X)
K = self.Wk(X)
V = self.Wv(X)
# ======================
# reshape 成多头 把一个大向量拆成多个 head
"""
原来: (B, T, dim_k)
比如: (32, 10, 64)
num_heads = 8
dk = 8
那么:
64 → 8 × 8
于是:
(32, 10, 64)
→ (32, 10, 8, 8)
"""
# ======================
Q = Q.reshape(B, T, self.num_heads, self.dk).transpose(1, 2)#->(B, head, T, dk)在T上做计算
K = K.reshape(B, T, self.num_heads, self.dk).transpose(1, 2)
V = V.reshape(B, T, self.num_heads, self.dv).transpose(1, 2)
# 现在形状:
# (B, head, T, dk)
# 修复:用 @ 替代 torch.mm,兼容所有场景
# ======================
# attention
# ======================
"""
实际在算的是:
(B, head, T, dk) @ (B, head, dk, T)
→ (B, head, T, T)
👉 每个 head 自动独立计算
"""
attn_score = Q @ K.transpose(-2, -1) / self.scale
attn_weight = torch.softmax(attn_score, dim=-1)
out = attn_weight @ V # (B, head, T, dv)
# ======================
# 拼接 heads .contiguous()-》因为需要连续内存
# ======================
out = out.transpose(1, 2).contiguous() # (B, T, head, dv) 从而把T和head合并,必须是“相邻维度”
out = out.reshape(B, T, -1) # (B, T, dim_v)
# 输出映射
out = self.fc_out(out)
return out, attn_weight
# attn_score = Q @ K.transpose(-2, -1)#等价于-1,-2等价于K.T
# attn_score = attn_score / self.scale.to(X.device) # 设备对齐
# attn_weight = torch.softmax(attn_score, dim=-1)
# out = attn_weight @ V
# return out, attn_weight
# 设置随机种子
def same_seed(seed):
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
class Model(nn.Module):
def __init__(self, input_dim, dim_k, dim_v,head,output_dim,dropout=0.12):
super(Model, self).__init__()
self.output_dim = output_dim
self.attention = MultiHeadSelfAttention(input_dim,dim_k, dim_v,head,output_dim)
# self.drop = nn.Dropout(dropout)
# self.fc = nn.Linear(output_dim,output_dim)
def forward(self,x):
out,_ = self.attention(x)
if self.output_dim == 1:
# out = out.squeeze()
out = out.mean(dim=1)
# out = self.drop(out)
# out = self.fc(out)
return out
使用设备: cuda
开始训练(优化版模型)...
✅ Epoch 1 | 最佳模型已保存 | Val Loss: 0.7090
Epoch 1 | Train Loss: 0.8743 | Val Loss: 0.7090 | lr: 2.00e-03
✅ Epoch 2 | 最佳模型已保存 | Val Loss: 0.6410
✅ Epoch 3 | 最佳模型已保存 | Val Loss: 0.6163
✅ Epoch 4 | 最佳模型已保存 | Val Loss: 0.6077
✅ Epoch 5 | 最佳模型已保存 | Val Loss: 0.6049
Epoch 21 | Train Loss: 0.7859 | Val Loss: 0.6129 | lr: 2.00e-03
Epoch 41 | Train Loss: 0.7856 | Val Loss: 0.6085 | lr: 1.00e-03
Epoch 61 | Train Loss: 0.7855 | Val Loss: 0.6091 | lr: 5.00e-04
Epoch 81 | Train Loss: 0.7855 | Val Loss: 0.6098 | lr: 5.00e-04
早停:连续 90 轮无提升
🎯 测试集 R²: 0.5053
实事求是的说,csdn这个平台挺糟糕的,尤其是商业化运作做的太过了,这也是为什么很多质量好的文章不愿意发在csdn,开源氛围做的不好
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)