首先直接贴R2结果:5轮训练就达到 测试集 R²: 0.5959

train、val、test集比例:0.75:0.15:0.1

其实糖尿病这个数据集的数据量太小了,只有四百来个样本,正常的model很难train起来,尤其是attention模型,那为什么还要用attention的model来做呢?主要是学习模型的写法,并且数据集小跑起来快,很适合做基础的算法验证。

双层attention在这个数据集上能做到0.5959其实蛮难的,大于绝大多数的model了。具体代码如下

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np

# ====================== 修复版:自注意力 + 设备支持 ======================
class self_attention(nn.Module):
    def __init__(self, input_dim, dim_k, dim_v):
        super().__init__()
        self.Wq = nn.Linear(input_dim, dim_k)
        self.Wk = nn.Linear(input_dim, dim_k)
        self.Wv = nn.Linear(input_dim, dim_v)
        # 缩放因子固定,避免每次forward新建张量
        self.scale = torch.sqrt(torch.tensor(dim_k, dtype=torch.float32))

    def forward(self, X):
        Q = self.Wq(X)
        K = self.Wk(X)
        V = self.Wv(X)

        # 修复:用 @ 替代 torch.mm,兼容所有场景

        attn_score = Q @ K.transpose(-2, -1)#等价于-1,-2等价于K.T
        attn_score = attn_score / self.scale.to(X.device) # 设备对齐
        attn_weight = torch.softmax(attn_score, dim=-1)
        out = attn_weight @ V
        return out, attn_weight

# 设置随机种子
def same_seed(seed):
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# ====================== 优化版模型:残差连接 + 删除冗余层 ======================
class Model(nn.Module):
    def __init__(self, input_dim, dim_k, dim_v, dropout=0.12):
        super().__init__()
        # 第一层注意力
        self.attn1 = self_attention(input_dim, dim_k, dim_v)
        self.drop1 = nn.Dropout(dropout)
        # 残差映射:适配维度
        self.res1 = nn.Linear(input_dim, dim_v)

        # 第二层注意力
        self.attn2 = self_attention(dim_v, dim_k, dim_v)
        self.drop2 = nn.Dropout(dropout)

        # 最终输出
        self.fc2 = nn.Linear(dim_v, 1)

    def forward(self, x):
        if self.training:
            x = x + torch.randn_like(x) * 0.01  # 超小噪声,安全有效
        # 第一层 + 残差
        attn1_out, _ = self.attn1(x)
        attn1_out = self.drop1(attn1_out)
        res_out1 = attn1_out + self.res1(x) # 残差连接

        # 第二层
        attn2_out, _ = self.attn2(res_out1)
        attn2_out = self.drop2(attn2_out)

        return self.fc2(attn2_out)

# ====================== 训练代码(加了DEVICE) ======================
if __name__ == "__main__":
    # 自动识别设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("使用设备:", device)

    split_seed = 14
    init_seed = 42
    same_seed(split_seed)

    data = load_diabetes()
    X = torch.tensor(data.data, dtype=torch.float32).to(device)
    y = torch.tensor(data.target, dtype=torch.float32).reshape(-1, 1).to(device)

    n = X.shape[0]
    train_n = int(0.75 * n)
    val_n = int(0.15 * n)

    indices = torch.randperm(n)
    train_idx = indices[:train_n]
    val_idx = indices[train_n: train_n + val_n]
    test_idx = indices[train_n + val_n:]

    X_train, y_train = X[train_idx], y[train_idx]
    X_val, y_val = X[val_idx], y[val_idx]
    X_test, y_test = X[test_idx], y[test_idx]

    # 标准化
    x_mean = X_train.mean(dim=0)
    x_std = X_train.std(dim=0).clamp_min(1e-6)
    X_train = (X_train - x_mean) / x_std
    X_val = (X_val - x_mean) / x_std
    X_test = (X_test - x_mean) / x_std

    y_mean = y_train.mean()
    y_std = y_train.std().clamp_min(1e-6)
    y_train_n = (y_train - y_mean) / y_std
    y_val_n = (y_val - y_mean) / y_std

    # 超参数
    input_dim = 10
    dim_k = 32
    dim_v = 32
    dropout = 0.12
    batch_size = 48
    n_epoch = 2000
    lr = 2e-3
    weight_decay = 5e-4
    betas = (0.9, 0.999)

    same_seed(init_seed)
    model = Model(input_dim, dim_k, dim_v, dropout=dropout).to(device) # 模型迁移到设备
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay, betas=betas)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=25, min_lr=1e-6)

    from torch.utils.data import DataLoader, TensorDataset
    train_loader = DataLoader(TensorDataset(X_train, y_train_n), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val, y_val_n), batch_size=batch_size, shuffle=False)

    best_path = "best_model.pth"
    best_val_loss = float("inf")
    early_stop_patience = 90
    epochs_no_improve = 0

    print("开始训练(优化版模型)...\n")

    for epoch in range(n_epoch):
        model.train()
        train_loss_sum = 0.0
        n_train = 0
        for xb, yb in train_loader:
            optimizer.zero_grad()
            pred = model(xb)
            loss = criterion(pred, yb)
            loss.backward()
            optimizer.step()
            train_loss_sum += loss.item() * xb.size(0)
            n_train += xb.size(0)
        train_loss = train_loss_sum / n_train

        model.eval()
        val_loss_sum = 0.0
        n_val = 0
        with torch.no_grad():
            for xb, yb in val_loader:
                vl = criterion(model(xb), yb)
                val_loss_sum += vl.item() * xb.size(0)
                n_val += xb.size(0)
        val_loss = val_loss_sum / n_val

        scheduler.step(val_loss)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_path)
            epochs_no_improve = 0
            print(f"✅ Epoch {epoch + 1:3d} | 最佳模型已保存 | Val Loss: {val_loss:.4f}")
        else:
            epochs_no_improve += 1

        if epoch % 20 == 0:
            current_lr = optimizer.param_groups[0]["lr"]
            print(f"Epoch {epoch + 1:3d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | lr: {current_lr:.2e}")

        if epochs_no_improve >= early_stop_patience:
            print(f"\n早停:连续 {early_stop_patience} 轮无提升")
            break

    # 测试
    model.load_state_dict(torch.load(best_path, map_location=device))
    model.eval()
    with torch.no_grad():
        pred_test_n = model(X_test)
    pred_test = pred_test_n * y_std + y_mean
    test_r2 = r2_score(y_test.cpu().numpy(), pred_test.cpu().numpy())
    print(f"\n🎯 测试集 R²: {test_r2:.4f}")

运行结果如下:

使用设备: cuda
开始训练(优化版模型)...

✅ Epoch   1 | 最佳模型已保存 | Val Loss: 0.6903
Epoch   1 | Train Loss: 0.9983 | Val Loss: 0.6903 | lr: 2.00e-03
✅ Epoch   2 | 最佳模型已保存 | Val Loss: 0.6078
✅ Epoch   3 | 最佳模型已保存 | Val Loss: 0.5738
✅ Epoch   4 | 最佳模型已保存 | Val Loss: 0.4661
✅ Epoch   5 | 最佳模型已保存 | Val Loss: 0.4623

Epoch  21 | Train Loss: 0.4881 | Val Loss: 0.5031 | lr: 2.00e-03
Epoch  41 | Train Loss: 0.4667 | Val Loss: 0.5080 | lr: 1.00e-03
Epoch  61 | Train Loss: 0.4756 | Val Loss: 0.4944 | lr: 5.00e-04
Epoch  81 | Train Loss: 0.4636 | Val Loss: 0.4944 | lr: 5.00e-04

早停:连续 90 轮无提升

🎯 测试集 R²: 0.5959

当然这个代码也有一个问题,即attention的维度是[b,b],从语义分析来说是没有什么价值的。所以我也做了一版feature的attention

但是这个样本量实在太小,feature的反而train不太起来,最后效果其实没有上面的好

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np

# ====================== 修复版:自注意力 + 设备支持 ======================
class self_attention(nn.Module):
    def __init__(self, input_dim, dim_k, dim_v):
        super().__init__()
        self.Wq = nn.Linear(input_dim, dim_k)
        self.Wk = nn.Linear(input_dim, dim_k)
        self.Wv = nn.Linear(input_dim, dim_v)
        # 缩放因子固定,避免每次forward新建张量
        self.scale = torch.sqrt(torch.tensor(dim_k, dtype=torch.float32))

    def forward(self, X):
        Q = self.Wq(X)
        K = self.Wk(X)
        V = self.Wv(X)

        # 修复:用 @ 替代 torch.mm,兼容所有场景

        attn_score = Q @ K.transpose(-2, -1)#等价于-1,-2等价于K.T
        attn_score = attn_score / self.scale.to(X.device) # 设备对齐
        attn_weight = torch.softmax(attn_score, dim=-1)
        out = attn_weight @ V
        return out, attn_weight

# 设置随机种子
def same_seed(seed):
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# ====================== 优化版模型:残差连接 + 删除冗余层 ======================
class Model(nn.Module):
    def __init__(self, input_dim, dim_k, dim_v, dropout=0.12):
        super().__init__()
        # 第一层注意力
        self.attn1 = self_attention(1, dim_k, dim_v)
        self.drop1 = nn.Dropout(dropout)
        # 残差映射:适配维度
        self.res1 = nn.Linear(1, dim_v)

        # 第二层注意力
        self.attn2 = self_attention(dim_v, dim_k, dim_v)
        self.drop2 = nn.Dropout(dropout)

        # 最终输出
        self.fc2 = nn.Linear(dim_v, 1)

    def forward(self, x):
        if self.training:
            x = x + torch.randn_like(x) * 0.01  # 超小噪声,安全有效
        # 🔥 关键1:把 (B, 10) → (B, 10, 1)
        x = x.unsqueeze(-1)
        # 第一层 + 残差
        attn1_out, _ = self.attn1(x)
        attn1_out = self.drop1(attn1_out)
        res_out1 = attn1_out + self.res1(x) # 残差连接

        # 第二层
        attn2_out, _ = self.attn2(res_out1)
        attn2_out = self.drop2(attn2_out)
        # 🔥 关键3:把特征聚合成一个向量
        out = attn2_out.mean(dim=1)   # (B, dim_v)

        return self.fc2(out)

# ====================== 训练代码(加了DEVICE) ======================
if __name__ == "__main__":
    # 自动识别设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("使用设备:", device)

    split_seed = 14
    init_seed = 42
    same_seed(split_seed)

    data = load_diabetes()
    X = torch.tensor(data.data, dtype=torch.float32).to(device)
    y = torch.tensor(data.target, dtype=torch.float32).reshape(-1, 1).to(device)

    n = X.shape[0]
    train_n = int(0.75 * n)
    val_n = int(0.15 * n)

    indices = torch.randperm(n)
    train_idx = indices[:train_n]
    val_idx = indices[train_n: train_n + val_n]
    test_idx = indices[train_n + val_n:]

    X_train, y_train = X[train_idx], y[train_idx]
    X_val, y_val = X[val_idx], y[val_idx]
    X_test, y_test = X[test_idx], y[test_idx]

    # 标准化
    x_mean = X_train.mean(dim=0)
    x_std = X_train.std(dim=0).clamp_min(1e-6)
    X_train = (X_train - x_mean) / x_std
    X_val = (X_val - x_mean) / x_std
    X_test = (X_test - x_mean) / x_std

    y_mean = y_train.mean()
    y_std = y_train.std().clamp_min(1e-6)
    y_train_n = (y_train - y_mean) / y_std
    y_val_n = (y_val - y_mean) / y_std

    # 超参数
    input_dim = 10
    dim_k = 32
    dim_v = 32
    dropout = 0.12
    batch_size = 48
    n_epoch = 2000
    lr = 2e-3
    weight_decay = 5e-4
    betas = (0.9, 0.999)

    same_seed(init_seed)
    model = Model(input_dim, dim_k, dim_v, dropout=dropout).to(device) # 模型迁移到设备
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay, betas=betas)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=25, min_lr=1e-6)

    from torch.utils.data import DataLoader, TensorDataset
    train_loader = DataLoader(TensorDataset(X_train, y_train_n), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val, y_val_n), batch_size=batch_size, shuffle=False)

    best_path = "best_model.pth"
    best_val_loss = float("inf")
    early_stop_patience = 90
    epochs_no_improve = 0

    print("开始训练(优化版模型)...\n")

    for epoch in range(n_epoch):
        model.train()
        train_loss_sum = 0.0
        n_train = 0
        for xb, yb in train_loader:
            optimizer.zero_grad()
            pred = model(xb)
            loss = criterion(pred, yb)
            loss.backward()
            optimizer.step()
            train_loss_sum += loss.item() * xb.size(0)
            n_train += xb.size(0)
        train_loss = train_loss_sum / n_train

        model.eval()
        val_loss_sum = 0.0
        n_val = 0
        with torch.no_grad():
            for xb, yb in val_loader:
                vl = criterion(model(xb), yb)
                val_loss_sum += vl.item() * xb.size(0)
                n_val += xb.size(0)
        val_loss = val_loss_sum / n_val

        scheduler.step(val_loss)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_path)
            epochs_no_improve = 0
            print(f"✅ Epoch {epoch + 1:3d} | 最佳模型已保存 | Val Loss: {val_loss:.4f}")
        else:
            epochs_no_improve += 1

        if epoch % 20 == 0:
            current_lr = optimizer.param_groups[0]["lr"]
            print(f"Epoch {epoch + 1:3d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | lr: {current_lr:.2e}")

        if epochs_no_improve >= early_stop_patience:
            print(f"\n早停:连续 {early_stop_patience} 轮无提升")
            break

    # 测试
    model.load_state_dict(torch.load(best_path, map_location=device))
    model.eval()
    with torch.no_grad():
        pred_test_n = model(X_test)
    pred_test = pred_test_n * y_std + y_mean
    test_r2 = r2_score(y_test.cpu().numpy(), pred_test.cpu().numpy())
    print(f"\n🎯 测试集 R²: {test_r2:.4f}")

运行结果

使用设备: cuda
开始训练(优化版模型)...

✅ Epoch   1 | 最佳模型已保存 | Val Loss: 0.6303
Epoch   1 | Train Loss: 0.9998 | Val Loss: 0.6303 | lr: 2.00e-03
✅ Epoch  18 | 最佳模型已保存 | Val Loss: 0.6297
✅ Epoch  20 | 最佳模型已保存 | Val Loss: 0.6285
✅ Epoch  21 | 最佳模型已保存 | Val Loss: 0.6163
Epoch  21 | Train Loss: 0.7827 | Val Loss: 0.6163 | lr: 2.00e-03
✅ Epoch  23 | 最佳模型已保存 | Val Loss: 0.6160
✅ Epoch  24 | 最佳模型已保存 | Val Loss: 0.6154
✅ Epoch  25 | 最佳模型已保存 | Val Loss: 0.6121
✅ Epoch  26 | 最佳模型已保存 | Val Loss: 0.6019
✅ Epoch  35 | 最佳模型已保存 | Val Loss: 0.6012
✅ Epoch  36 | 最佳模型已保存 | Val Loss: 0.5983
Epoch  41 | Train Loss: 0.7580 | Val Loss: 0.6077 | lr: 2.00e-03
✅ Epoch  48 | 最佳模型已保存 | Val Loss: 0.5901
Epoch  61 | Train Loss: 0.7527 | Val Loss: 0.6047 | lr: 2.00e-03
✅ Epoch  65 | 最佳模型已保存 | Val Loss: 0.5889
Epoch  81 | Train Loss: 0.7534 | Val Loss: 0.6075 | lr: 2.00e-03
Epoch 101 | Train Loss: 0.7505 | Val Loss: 0.6000 | lr: 1.00e-03
✅ Epoch 112 | 最佳模型已保存 | Val Loss: 0.5887
Epoch 121 | Train Loss: 0.7385 | Val Loss: 0.6058 | lr: 1.00e-03
✅ Epoch 129 | 最佳模型已保存 | Val Loss: 0.5856
Epoch 141 | Train Loss: 0.7330 | Val Loss: 0.5921 | lr: 1.00e-03
✅ Epoch 143 | 最佳模型已保存 | Val Loss: 0.5846
✅ Epoch 148 | 最佳模型已保存 | Val Loss: 0.5805
✅ Epoch 156 | 最佳模型已保存 | Val Loss: 0.5797
✅ Epoch 161 | 最佳模型已保存 | Val Loss: 0.5792
Epoch 161 | Train Loss: 0.7337 | Val Loss: 0.5792 | lr: 1.00e-03
✅ Epoch 175 | 最佳模型已保存 | Val Loss: 0.5778
Epoch 181 | Train Loss: 0.7306 | Val Loss: 0.5882 | lr: 1.00e-03
✅ Epoch 183 | 最佳模型已保存 | Val Loss: 0.5769
✅ Epoch 184 | 最佳模型已保存 | Val Loss: 0.5738
✅ Epoch 198 | 最佳模型已保存 | Val Loss: 0.5712
Epoch 201 | Train Loss: 0.7330 | Val Loss: 0.5909 | lr: 1.00e-03
✅ Epoch 211 | 最佳模型已保存 | Val Loss: 0.5711
✅ Epoch 216 | 最佳模型已保存 | Val Loss: 0.5699
✅ Epoch 221 | 最佳模型已保存 | Val Loss: 0.5693
Epoch 221 | Train Loss: 0.7310 | Val Loss: 0.5693 | lr: 1.00e-03
Epoch 241 | Train Loss: 0.7279 | Val Loss: 0.5862 | lr: 1.00e-03
Epoch 261 | Train Loss: 0.7258 | Val Loss: 0.5905 | lr: 5.00e-04
Epoch 281 | Train Loss: 0.7072 | Val Loss: 0.5913 | lr: 2.50e-04
Epoch 301 | Train Loss: 0.7194 | Val Loss: 0.5921 | lr: 1.25e-04

早停:连续 90 轮无提升

🎯 测试集 R²: 0.5495

当然,谈到attention肯定少不了mult-head,具体代码如下

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np
# ====================== 训练代码(加了DEVICE) ======================
if __name__ == "__main__":
    # 自动识别设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("使用设备:", device)

    split_seed = 14
    init_seed = 42
    same_seed(split_seed)

    data = load_diabetes()
    X = torch.tensor(data.data, dtype=torch.float32).to(device)
    y = torch.tensor(data.target, dtype=torch.float32).reshape(-1, 1).to(device)

    n = X.shape[0]
    train_n = int(0.75 * n)
    val_n = int(0.15 * n)

    indices = torch.randperm(n)
    train_idx = indices[:train_n]
    val_idx = indices[train_n: train_n + val_n]
    test_idx = indices[train_n + val_n:]

    X_train, y_train = X[train_idx], y[train_idx]
    X_val, y_val = X[val_idx], y[val_idx]
    X_test, y_test = X[test_idx], y[test_idx]

    # 标准化
    x_mean = X_train.mean(dim=0)
    x_std = X_train.std(dim=0).clamp_min(1e-6)
    X_train = (X_train - x_mean) / x_std
    X_val = (X_val - x_mean) / x_std
    X_test = (X_test - x_mean) / x_std

    y_mean = y_train.mean()
    y_std = y_train.std().clamp_min(1e-6)
    y_train_n = (y_train - y_mean) / y_std
    y_val_n = (y_val - y_mean) / y_std

    # 超参数
    input_dim = 1
    dim_k = 16
    dim_v = 16
    dropout = 0.12
    batch_size = 48
    n_epoch = 2000
    lr = 2e-3
    weight_decay = 5e-4
    betas = (0.9, 0.999)
    head = 2
    same_seed(init_seed)
    model = Model(input_dim, dim_k, dim_v,head,output_dim= 1, dropout=dropout).to(device) # 模型迁移到设备
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay, betas=betas)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=25, min_lr=1e-6)

    from torch.utils.data import DataLoader, TensorDataset
    train_loader = DataLoader(TensorDataset(X_train, y_train_n), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val, y_val_n), batch_size=batch_size, shuffle=False)

    best_path = "best_model.pth"
    best_val_loss = float("inf")
    early_stop_patience = 90
    epochs_no_improve = 0

    print("开始训练(优化版模型)...\n")

    for epoch in range(n_epoch):
        model.train()
        train_loss_sum = 0.0
        n_train = 0
        for xb, yb in train_loader:
            xb = xb.unsqueeze(-1)   # ⭐在这里处理
            optimizer.zero_grad()
            pred = model(xb)
            loss = criterion(pred, yb)
            loss.backward()
            optimizer.step()
            train_loss_sum += loss.item() * xb.size(0)
            n_train += xb.size(0)
        train_loss = train_loss_sum / n_train

        model.eval()
        val_loss_sum = 0.0
        n_val = 0
        with torch.no_grad():
            for xb, yb in val_loader:
                xb = xb.unsqueeze(-1)   # ⭐同样处理
                vl = criterion(model(xb), yb)
                val_loss_sum += vl.item() * xb.size(0)
                n_val += xb.size(0)
        val_loss = val_loss_sum / n_val

        scheduler.step(val_loss)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_path)
            epochs_no_improve = 0
            print(f"✅ Epoch {epoch + 1:3d} | 最佳模型已保存 | Val Loss: {val_loss:.4f}")
        else:
            epochs_no_improve += 1

        if epoch % 20 == 0:
            current_lr = optimizer.param_groups[0]["lr"]
            print(f"Epoch {epoch + 1:3d} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | lr: {current_lr:.2e}")

        if epochs_no_improve >= early_stop_patience:
            print(f"\n早停:连续 {early_stop_patience} 轮无提升")
            break

    # 测试
    model.load_state_dict(torch.load(best_path, map_location=device))
    model.eval()
    with torch.no_grad():
        X_test = X_test.unsqueeze(-1)# ⭐这里
        pred_test_n = model(X_test)
    pred_test = pred_test_n * y_std + y_mean
    test_r2 = r2_score(y_test.cpu().numpy(), pred_test.cpu().numpy())
    print(f"\n🎯 测试集 R²: {test_r2:.4f}")

其中model:
 

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import numpy as np
import math

# ====================== 修复版:自注意力 + 设备支持 ======================
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, input_dim, dim_k, dim_v,num_heads,output_dim = None):
        super().__init__()

        assert dim_k % num_heads == 0
        assert dim_v % num_heads == 0
        if output_dim is None:
            output_dim = input_dim
        self.num_heads = num_heads
        self.dk = dim_k // num_heads
        self.dv = dim_v // num_heads

        self.Wq = nn.Linear(input_dim, dim_k)
        self.Wk = nn.Linear(input_dim, dim_k)
        self.Wv = nn.Linear(input_dim, dim_v)

        # 输出映射(multi-head之后要再融合)
        self.fc_out = nn.Linear(dim_v, output_dim)
        # self.scale = math.sqrt(self.dk)
        # 缩放因子固定,避免每次forward新建张量
        self.scale = torch.sqrt(torch.tensor(self.dk, dtype=torch.float32))

    def forward(self, X):
        B, T, _ = X.shape  # batch, seq_len, dim

        Q = self.Wq(X)
        K = self.Wk(X)
        V = self.Wv(X)

        # ======================
        # reshape 成多头  把一个大向量拆成多个 head
        """
        原来: (B, T, dim_k)
        比如: (32, 10, 64)
        num_heads = 8
        dk = 8
        那么:
        64 → 8 × 8
        于是:
        (32, 10, 64)
        → (32, 10, 8, 8)
        """
        # ======================
        Q = Q.reshape(B, T, self.num_heads, self.dk).transpose(1, 2)#->(B, head, T, dk)在T上做计算
        K = K.reshape(B, T, self.num_heads, self.dk).transpose(1, 2)
        V = V.reshape(B, T, self.num_heads, self.dv).transpose(1, 2)
        # 现在形状:
        # (B, head, T, dk)
        # 修复:用 @ 替代 torch.mm,兼容所有场景
        # ======================
        # attention
        # ======================
        """
        实际在算的是:
        (B, head, T, dk) @ (B, head, dk, T)
        → (B, head, T, T)
        👉 每个 head 自动独立计算
        """
        attn_score = Q @ K.transpose(-2, -1) / self.scale
        attn_weight = torch.softmax(attn_score, dim=-1)

        out = attn_weight @ V  # (B, head, T, dv)
         # ======================
        # 拼接 heads  .contiguous()-》因为需要连续内存
        # ======================
        out = out.transpose(1, 2).contiguous()  # (B, T, head, dv) 从而把T和head合并,必须是“相邻维度”
        out = out.reshape(B, T, -1)  # (B, T, dim_v)

        # 输出映射
        out = self.fc_out(out)

        return out, attn_weight
        # attn_score = Q @ K.transpose(-2, -1)#等价于-1,-2等价于K.T
        # attn_score = attn_score / self.scale.to(X.device) # 设备对齐
        # attn_weight = torch.softmax(attn_score, dim=-1)
        # out = attn_weight @ V
        # return out, attn_weight

# 设置随机种子
def same_seed(seed):
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
class Model(nn.Module):
    def __init__(self, input_dim, dim_k, dim_v,head,output_dim,dropout=0.12):
        super(Model, self).__init__()
        self.output_dim = output_dim
        self.attention = MultiHeadSelfAttention(input_dim,dim_k, dim_v,head,output_dim)
        # self.drop = nn.Dropout(dropout)
        # self.fc = nn.Linear(output_dim,output_dim)
    def forward(self,x):
        out,_ = self.attention(x)
        if self.output_dim == 1:
            # out = out.squeeze()
            out = out.mean(dim=1)
        # out = self.drop(out)
        # out = self.fc(out)
        return out

使用设备: cuda
开始训练(优化版模型)...

✅ Epoch   1 | 最佳模型已保存 | Val Loss: 0.7090
Epoch   1 | Train Loss: 0.8743 | Val Loss: 0.7090 | lr: 2.00e-03
✅ Epoch   2 | 最佳模型已保存 | Val Loss: 0.6410
✅ Epoch   3 | 最佳模型已保存 | Val Loss: 0.6163
✅ Epoch   4 | 最佳模型已保存 | Val Loss: 0.6077
✅ Epoch   5 | 最佳模型已保存 | Val Loss: 0.6049
Epoch  21 | Train Loss: 0.7859 | Val Loss: 0.6129 | lr: 2.00e-03
Epoch  41 | Train Loss: 0.7856 | Val Loss: 0.6085 | lr: 1.00e-03
Epoch  61 | Train Loss: 0.7855 | Val Loss: 0.6091 | lr: 5.00e-04
Epoch  81 | Train Loss: 0.7855 | Val Loss: 0.6098 | lr: 5.00e-04

早停:连续 90 轮无提升

🎯 测试集 R²: 0.5053

 

实事求是的说,csdn这个平台挺糟糕的,尤其是商业化运作做的太过了,这也是为什么很多质量好的文章不愿意发在csdn,开源氛围做的不好

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐