1、编码器 & 解码器 的词嵌入层通常是独立的

《英译法案例》里的编码器和解码器,它俩的词嵌入也是单独的

编码器和解码器的词嵌入层设计

  1. 通常情况下:各自使用独立的词嵌入层

在原始Transformer论文(《Attention Is All You Need》)和大多数标准实现中,编码器和解码器使用各自的词嵌入层,原因如下:

组件 词嵌入层 说明
Encoder 独立的 Embedding 层 处理源序列(如翻译中的源语言)
Decoder 独立的 Embedding 层 处理目标序列(如翻译中的目标语言)

主要原因:

  • 词汇表可能不同:在机器翻译任务中,源语言和目标语言的词汇表通常不同
  • 语义角色不同:编码器处理输入理解,解码器处理输出生成,语义表示空间可能有差异

  1. 特殊情况:可以共享词嵌入层

在以下场景中,编码器和解码器可以共享同一个词嵌入层

  • 源语言和目标语言相同的任务(如文本摘要、对话生成、语言建模)
  • 词汇表完全一致的情况下
  • ✅ 为了减少参数量时的优化选择

⚠️ 注意:共享嵌入层时,编码器和解码器的词汇表索引必须对齐一致。


  1. 重要的权重共享技巧:输出嵌入绑定

这是两个不同层面的权重共享概念,需区分清楚:

共享类型 共享对象 常见程度
类型A Encoder嵌入 ↔ Decoder嵌入 较少见(需词汇表一致)
类型B Decoder输入嵌入 ↔ Decoder输出投影 非常常见(推荐做法)

类型B(权重绑定/Weight Tying) 的具体做法:

解码器的输入嵌入层权重 = 解码器的输出投影层权重

好处:

  • 减少参数量(约节省1/3嵌入相关参数)
  • 提高训练稳定性
  • 改善生成质量(尤其在小数据集上)

  1. 完整的数据流示意
┌─────────────────────────────────────────────────────────────────────┐
│                         Encoder 路径                                │
├─────────────────────────────────────────────────────────────────────┤
│  输入x → [Encoder Embedding] → x_embed → [位置编码] → x_position   │
│                                           → Encoder → 上下文表示    │
└─────────────────────────────────────────────────────────────────────┘
                                                        ↓
┌─────────────────────────────────────────────────────────────────────┐
│                         Decoder 路径                                │
├─────────────────────────────────────────────────────────────────────┤
│  输入y → [Decoder Embedding] → y_embed → [位置编码] → y_position   │
│                                           → Decoder → 隐藏状态      │
│                                                        → [输出投影] → 预测  │
│                         ↑                              │            │
│                         └──── 权重绑定(常见优化)─────┘            │
└─────────────────────────────────────────────────────────────────────┘

⚠️ 注意:权重绑定是 Decoder 内部的输入嵌入与输出投影之间,
        不是 Encoder 和 Decoder 之间的共享

  1. 总结对比表
问题 答案 备注
编码器和解码器是否共享词嵌入层? 通常不共享,各自独立 机器翻译任务标准做法
什么情况下可以共享? 源目标语言相同、词汇表一致时 如摘要、对话等任务
有什么权重共享技巧? Decoder输入嵌入 ↔ Decoder输出投影 权重绑定,强烈推荐
位置编码如何处理? 编码器和解码器各自独立添加位置编码 使用相同的编码函数

  1. 实践建议
任务类型 嵌入层建议 权重绑定建议
机器翻译(不同语言) Encoder/Decoder独立 Decoder内部绑定 ✅
文本摘要 可共享或独立 Decoder内部绑定 ✅
对话生成 可共享或独立 Decoder内部绑定 ✅
语言模型(Decoder-only) 单嵌入层 输入输出绑定 ✅

2、编码器 & 解码器 的位置编码通常是共享的

通常情况下,位置编码(Positional Encoding)是共享的(即使用同一套参数/公式),不需要像词嵌入层(Token Embedding)那样独立。

✅ 核心结论

  • 词嵌入层 (Token Embedding)必须独立

    • 原因:编码器处理的是源语言(如中文),解码器处理的是目标语言(如英文)。它们的词汇表(Vocabulary)完全不同,索引含义也不同,所以必须用两个独立的 nn.Embedding 层。
  • 位置编码层 (Positional Encoding)通常共享(甚至不需要定义为 nn.Module 的参数)。

    • 原因:位置编码通常是固定的数学公式(正弦/余弦函数)或可学习的向量,它只与“位置索引”和“维度 d m o d e l d_{model} dmodel”有关,而与“具体的语言”或“具体的词”无关。
    • 无论是中文的“第 1 个字”还是英文的“第 1 个词”,它们在序列中的相对/绝对位置意义是一样的,因此可以使用完全相同的位置编码向量 P E 0 , P E 1 , … PE_0, PE_1, \dots PE0,PE1,

💡 为什么可以共享?

  1. 基于固定公式(Original Transformer)

在原始论文 Attention Is All You Need 中,位置编码是通过正弦和余弦函数计算出来的:
P E ( p o s , 2 i ) = sin ⁡ ( p o s / 10000 2 i / d m o d e l ) PE_{(pos, 2i)} = \sin(pos / 10000^{2i/d_{model}}) PE(pos,2i)=sin(pos/100002i/dmodel)
P E ( p o s , 2 i + 1 ) = cos ⁡ ( p o s / 10000 2 i / d m o d e l ) PE_{(pos, 2i+1)} = \cos(pos / 10000^{2i/d_{model}}) PE(pos,2i+1)=cos(pos/100002i/dmodel)

  • 这个公式没有任何可学习参数
  • 只要编码器和解码器的 d m o d e l d_{model} dmodel 相同,它们生成的位置编码矩阵就是完全一样的。
  • 实现方式:通常写成一个通用的工具函数或一个单独的 PositionalEncoding 类, encoder 和 decoder 都调用它。
  1. 基于可学习向量(Learned Positional Embedding)

有些模型(如 BERT, GPT)将位置编码作为可学习的参数 nn.Embedding(max_len, d_model)

  • 即使在这种情况下,通常也是共享的
  • 原因:位置的概念(第 1 位、第 2 位…)在源语言和目标语言中是通用的。让模型去学习两套不同的“位置概念”(例如:编码器认为“位置 1”代表句首,解码器认为“位置 1”代表句中)通常没有好处,反而增加参数量且可能导致过拟合。
  • 例外情况:除非你有非常特殊的理由相信源语言和目标语言的“位置结构”有本质不同(极少见),否则不要分开定义。

💻 代码实现示例

在标准的 Transformer 实现中,你会看到这样的结构:

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, ...):
        super().__init__()
        
        # 1. 词嵌入层:必须独立 (因为 vocab 不同)
        self.src_embed = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embed = nn.Embedding(tgt_vocab_size, d_model)
        
        # 2. 位置编码:通常共享 (只有一个实例)
        # 无论是固定公式还是可学习参数,都只用一份
        self.pos_encoder = PositionalEncoding(d_model, max_len=5000)
        
        # 3. 编码器和解码器
        self.encoder = Encoder(...)
        self.decoder = Decoder(...)
        
        # 初始化权重
        self._init_weights()

    def forward(self, src, tgt, src_mask, tgt_mask):
        # --- 编码器部分 ---
        # 源语言 Embedding + 共享的位置编码
        src_emb = self.src_embed(src) * math.sqrt(self.d_model)
        src_emb = self.pos_encoder(src_emb)  # 直接加,start_idx=0
        enc_output = self.encoder(src_emb, src_mask)
        
        # --- 解码器部分 ---
        # 目标语言 Embedding + 共享的位置编码
        tgt_emb = self.tgt_embed(tgt) * math.sqrt(self.d_model)
        
        # 训练时 start_idx=0; 推理时在 Decoder 内部动态计算
        tgt_emb = self.pos_encoder(tgt_emb) 
        dec_output = self.decoder(tgt_emb, enc_output, tgt_mask, src_mask)
        
        return dec_output

📊 总结对比表

组件 编码器 (Encoder) 解码器 (Decoder) 是否共享? 原因
词嵌入 (Token Embedding) Embedding(Vocab_src, D) Embedding(Vocab_tgt, D) 不共享 词汇表不同,语言不同,语义空间不同。
位置编码 (Pos Encoding) PosEnc(D) PosEnc(D) 共享 位置是通用的数学概念(第 1 个、第 2 个…),与语言无关。
维度 ( d m o d e l d_{model} dmodel) D D D D D D 必须一致 为了让解码器的 Cross-Attention 能接收编码器的输出。

⚠️ 特殊情况提示

如果你使用的是 BERT 式 的架构(Encoder-only),通常会有:

  • Token Embedding
  • Position Embedding (可学习,共享)
  • Segment Embedding (用于区分句子 A 和句子 B,这是特有的)

但在标准的 Seq2Seq Transformer (Encoder-Decoder) 中,位置编码绝对是共享的。你只需要实例化一次 PositionalEncoding,然后分别在 Encoder 和 Decoder 的输入上调用它即可。


3、nn.Sequential:模型挨个执行

PyTorch nn.Sequential 完整指南


一、它是什么?

nn.Sequential 是 PyTorch torch.nn 模块中的一个容器类(Container Class)

import torch.nn as nn

# 它本身是一个 nn.Module 的子类
print(issubclass(nn.Sequential, nn.Module))  # True

本质理解:

  • 它是一个有序的模块容器
  • 内部按添加顺序存储多个神经网络层
  • 继承自 nn.Module,因此可以像普通层一样使用

二、它是干嘛的?

核心功能: 将多个神经网络层按顺序串联,数据依次通过每一层。

输入数据
   ↓
[ 第1层 ]  →  输出1
   ↓
[ 第2层 ]  →  输出2
   ↓
[ 第3层 ]  →  输出3
   ↓
   ...
   ↓
[ 第N层 ]  →  最终输出

关键特性:

  • 自动执行前向传播(无需手动写 forward()
  • 前一层输出 = 后一层输入
  • 所有子模块的参数自动注册到父模块

三、它有什么用?

用途 说明
📦 快速搭建网络 几行代码构建完整模型
🔧 代码简洁 无需定义类和 forward 方法
📋 结构清晰 网络层次一目了然
🎯 易于调试 可按索引访问任意层
💾 方便保存 整体保存/加载模型参数

典型应用场景:

  • 多层感知机(MLP)
  • 卷积神经网络(CNN)的卷积块
  • 自编码器(Encoder/Decoder)
  • 任何纯顺序结构的网络

四、函数签名

构造方法

torch.nn.Sequential(*args)

参数说明

参数 类型 必填 默认值 说明
*args nn.Module 可变数量的子模块,按顺序传入

返回值

返回 类型 说明
sequential nn.Sequential 有序容器对象

五、参数详解与构造方式

方式1:直接传入模块(最常用)

model = nn.Sequential(
    nn.Linear(784, 128),    # 第0层
    nn.ReLU(),              # 第1层
    nn.Linear(128, 10)      # 第2层
)

方式2:使用 OrderedDict(可命名)

from collections import OrderedDict

model = nn.Sequential(OrderedDict([
    ('fc1', nn.Linear(784, 128)),
    ('relu1', nn.ReLU()),
    ('fc2', nn.Linear(128, 10))
]))

方式3:空构造 + 动态添加

model = nn.Sequential()
model.add_module('conv1', nn.Conv2d(3, 64, 3))
model.add_module('bn1', nn.BatchNorm2d(64))
model.add_module('relu1', nn.ReLU())

方式4:混合使用

# 可以先创建部分,再添加
model = nn.Sequential(nn.Linear(784, 128))
model.add_module('relu', nn.ReLU())
model.add_module('fc2', nn.Linear(128, 10))

六、常用操作

6.1 前向传播

output = model(input)  # 自动按顺序执行所有层

6.2 访问子模块

# 按索引访问(方式1构造)
layer0 = model[0]
layer1 = model[1]

# 按名称访问(方式2构造)
fc1 = model.fc1
relu1 = model.relu1

# 切片访问(返回新的 Sequential)
sub_model = model[0:2]  # 获取前两层

6.3 添加/插入模块

# 追加到末尾
model.add_module('new_layer', nn.Linear(10, 5))

# 插入到指定位置(PyTorch 1.10+)
model.insert(1, nn.Dropout(0.5))  # 在索引1处插入

6.4 删除模块

# 删除指定索引(✅ 正确)
del model[1]

# 删除指定名称(✅ 正确方式)
delattr(model, 'relu1')  # 或 del model._modules['relu1']

6.5 遍历模块

# 遍历所有直接子模块(推荐)
for i, module in enumerate(model):
    print(f"Layer {i}: {module}")

# 或
for name, module in model.named_children():
    print(f"{name}: {module}")

# 遍历所有嵌套模块(包括子模块的子模块)
for name, module in model.named_modules():
    print(f"{name}: {module}")

# 遍历参数
for name, param in model.named_parameters():
    print(f"{name}: {param.shape}")

⚠️ 注意: named_children() 只遍历直接子模块,named_modules() 会递归遍历所有嵌套模块。对于 Sequential,通常使用 named_children() 更合适。

6.6 获取模型信息

print(model)                    # 打印模型结构
print(model[0].weight.shape)    # 查看权重形状
print(list(model.parameters())) # 所有参数列表
print(list(model.children()))   # 所有直接子模块

6.7 保存与加载

# 保存整个模型(不推荐,依赖类定义路径)
torch.save(model, 'model.pth')

# 保存参数(✅ 推荐)
torch.save(model.state_dict(), 'model_params.pth')

# 加载参数(注意设备映射)
model.load_state_dict(torch.load('model_params.pth', map_location=device))
model.eval()  # 设为评估模式

6.8 设备迁移

# 移动到GPU
model = model.cuda()
# 或
model = model.to('cuda:0')

# 移动到CPU
model = model.to('cpu')

# 同时迁移模型和数据(推荐)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

6.9 训练/评估模式切换

model.train()   # 训练模式(启用Dropout、BatchNorm训练统计)
model.eval()    # 评估模式(关闭Dropout、使用BatchNorm运行统计)

6.10 权重初始化

def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        nn.init.zeros_(m.bias)
    elif isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight)

model.apply(init_weights)  # 递归应用到所有子模块

七、完整示例

import torch
import torch.nn as nn
from collections import OrderedDict

# 1. 创建模型
model = nn.Sequential(OrderedDict([
    ('fc1', nn.Linear(784, 256)),
    ('bn1', nn.BatchNorm1d(256)),
    ('relu1', nn.ReLU()),
    ('drop1', nn.Dropout(0.3)),
    ('fc2', nn.Linear(256, 128)),
    ('relu2', nn.ReLU()),
    ('fc3', nn.Linear(128, 10))
]))

# 2. 移动到设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# 3. 初始化权重
model.apply(lambda m: nn.init.xavier_uniform_(m.weight) if isinstance(m, nn.Linear) else None)

# 4. 查看结构
print(model)

# 5. 前向传播
x = torch.randn(32, 784).to(device)  # batch_size=32
output = model(x)
print(output.shape)  # torch.Size([32, 10])

# 6. 访问特定层
print(model.fc1.weight.shape)  # torch.Size([256, 784])

# 7. 切换模式
model.train()
# ... 训练代码 ...
model.eval()
# ... 评估代码 ...

八、注意事项 ⚠️

注意点 说明
🔸 维度匹配 确保相邻层的输入输出维度一致
🔸 顺序重要 模块执行顺序=添加顺序
🔸 无分支 无法实现多输入/多输出/跳跃连接
🔸 命名唯一 使用OrderedDict时,名称不能重复
🔸 模式切换 训练/评估前记得调用 train()/eval()
🔸 设备一致 模型和数据必须在同一设备上
🔸 加载映射 加载参数时使用 map_location 避免设备错误

常见错误示例

# ❌ 错误:维度不匹配
model = nn.Sequential(
    nn.Linear(784, 128),
    nn.Linear(64, 10)  # 输入应该是128!
)

# ✅ 正确
model = nn.Sequential(
    nn.Linear(784, 128),
    nn.Linear(128, 10)
)

# ❌ 错误:忘记激活函数
model = nn.Sequential(
    nn.Linear(784, 128),
    nn.Linear(128, 10)  # 没有非线性
)

# ✅ 正确
model = nn.Sequential(
    nn.Linear(784, 128),
    nn.ReLU(),
    nn.Linear(128, 10)
)

# ❌ 错误:设备不一致
model = model.cuda()
x = torch.randn(32, 784)  # 在CPU上
output = model(x)  # 报错!

# ✅ 正确
x = x.to(device)
output = model(x)

九、局限性说明

nn.Sequential 不适合以下场景:

场景 原因 替代方案
残差连接(ResNet) 需要 output = conv(x) + x 自定义 nn.Module
多分支结构(Inception) 需要并行处理 自定义 nn.Module
条件执行 需要动态控制流程 自定义 nn.Module
多输入/多输出 需要多个张量进出 自定义 nn.Module

十、快速参考表

操作 代码
创建 nn.Sequential(layer1, layer2, ...)
前向传播 output = model(input)
按索引访问 model[0]
按名称访问 model.layer_name
切片访问 model[0:2]
添加模块 model.add_module('name', layer)
插入模块 model.insert(idx, layer)
删除模块(索引) del model[idx]
删除模块(名称) delattr(model, 'name')
遍历子模块 for m in model:model.named_children()
遍历参数 model.named_parameters()
保存参数 torch.save(model.state_dict(), path)
加载参数 model.load_state_dict(torch.load(path, map_location=device))
设备迁移 model.to('cuda')
训练模式 model.train()
评估模式 model.eval()
权重初始化 model.apply(init_fn)

十一、与相关容器对比

特性 Sequential ModuleList ModuleDict
自动 forward ✅ 是 ❌ 否 ❌ 否
按索引访问 ✅ 是 ✅ 是 ❌ 否
按名称访问 ⚠️ 需OrderedDict ❌ 否 ✅ 是
适用场景 顺序网络 动态层数 命名模块管理

4、transformer - 完整代码

# coding: utf-8
import torch
import torch.nn as nn
import math
import torch.nn.functional as F
import copy


# 注意是 Embeddings, 不是 Embedding, 和 nn.Embedding 区分
# 仅仅是 词嵌入, 没有 位置编码
class Embeddings(nn.Module):
    def __init__(self, vocabulary_size: int, d_model: int):
        super().__init__()
        self.vocabulary_size = vocabulary_size  # 词表大小
        self.d_model = d_model  # 词向量维度

        self.embed = nn.Embedding(num_embeddings=vocabulary_size, embedding_dim=d_model)

    def forward(self, x):
        # x.shape = (N, T)
        # nn.Embedding 要求输入的 indices(即 x)必须是整数类型(如 torch.long 或 torch.int)
        # math.sqrt 放缩的作用
        x = self.embed(x) * math.sqrt(self.d_model)
        return x


# 位置编码  【 Positional  adj.位置的 】
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 1024):
        super().__init__()

        assert d_model % 2 == 0, 'd_model 必须为偶数'

        self.d_model = d_model
        self.max_len = max_len

        # max_len      # 最大 token 数量
        # d_model      # 词向量维度

        # 创建大表格
        pe = torch.zeros(size=(max_len, d_model))

        # 创建位置索引
        position = torch.arange(start=0, end=max_len, step=1, dtype=torch.float)
        print('position.shape =', position.shape)  # torch.Size([1024])
        print(f'position: {position}')
        # tensor([0.0000e+00, 1.0000e+00, 2.0000e+00,  ..., 1.0210e+03, 1.0220e+03, 1.0230e+03])

        # 目的:
        #   pos     dim_0   dim_1   dim_2   dim_3   dim_4   ...   dim_d_model
        #    0       0*w0    0*w1    0*w2    0*w3    0*w4   ...   0*wd_model
        #    1       1*w0    1*w1    1*w2    1*w3    1*w4   ...   1*wd_model
        #    2       2*w0    2*w1    2*w2    2*w3    2*w4   ...   2*wd_model
        #    3       3*w0    3*w1    3*w2    3*w3    3*w4   ...   3*wd_model
        #    4       4*w0    4*w1    4*w2    4*w3    4*w4   ...   4*wd_model
        #   ...
        #  max_len   m*w0    m*w1    m*w2    m*w3    m*w4   ...   m*wd_model
        # position 是 (max_len, ) 即 [0, 1, 2, ..., max_len]
        # 每行都需要 d_model 哥 0, 要变成 (max_len, d_model) 个0
        # 如果变成 (1, max_len), 那即使广播,也只能变成:
        #       [ [0, 1, 2, ..., max_len],
        #         [0, 1, 2, ..., max_len],
        #         ...
        #         [0, 1, 2, ..., max_len] ]
        # 所以只能变成 (max_len, 1) -> 广播后:
        #   [ [0],                 [ [0, 0, 0, ..., 0],
        #     [1],                   [0, 0, 0, ..., 0],
        #     [2],                   [0, 0, 0, ..., 0],
        #     [3],                   [0, 0, 0, ..., 0],
        #     ...,                   ...,
        #     [max_len] ]            [0, 0, 0, ..., 0] ]
        position = position.unsqueeze(dim=1)

        # 计算 w = 10000 ^ (- 2i/d) = exp( 2i * (-ln(10000) / d) )
        div_term = torch.exp(
            torch.arange(0, d_model, 2, dtype=torch.float) *
            (-math.log(10000) / d_model)
        )
        print('div_term.shape =', div_term.shape)  # torch.Size([34])

        # div_term 需要变成 每行都是 [w0, w1, w2, w3, ..., w_d_model]
        # 即:
        # [ [w0, w1, w2, w3, ..., w_d_model],
        #   [w0, w1, w2, w3, ..., w_d_model],
        #   ...,
        #   [w0, w1, w2, w3, ..., w_d_model] ]
        # 所以 只能变成 (1, d_model)
        # 但是 position 已经变了, (max_len, 1) 与 (d_model, ) 运算
        # 可以广播,  (max_len, 1)  ->  (max_len, d_model)
        #           (1, d_model)  ->  (max_len, d_model)
        # 所以 div_term 可以不进行升维,但也可以显示升维

        # product 相当于算出来了 相位, 即 sin(x) 中的 x
        product = position * div_term
        print('product.shape =', product.shape)  # torch.Size([1024, 34])

        # 万事俱备,只差 sin、cos
        # sin:
        # 取所有行, 每行从0开始,取到末尾,步长为 2
        pe[:, 0:: 2] = torch.sin(product)

        # cos:
        # 取所有行, 每行从1开始,取到末尾,步长为 2
        pe[:, 1:: 2] = torch.cos(product)

        print('pe.shape =', pe.shape)  # torch.Size([1024, 68])
        print(pe)  # 只要 max_len 和 d_model 确定了,那 pe 就确定了
        # tensor([[ 0.0000e+00,  1.0000e+00,  0.0000e+00,  ...,  1.0000e+00, 0.0000e+00,  1.0000e+00],
        #         [ 8.4147e-01,  5.4030e-01,  6.9087e-01,  ...,  1.0000e+00, 1.3111e-04,  1.0000e+00],
        #         [ 9.0930e-01, -4.1615e-01,  9.9897e-01,  ...,  1.0000e+00, 2.6223e-04,  1.0000e+00],
        #         ...,
        #         [ 9.9236e-01, -1.2340e-01, -1.1004e-02,  ..., -6.7493e-01, 2.6875e-01,  9.6321e-01],
        #         [ 4.3234e-01, -9.0171e-01,  6.8216e-01,  ..., -6.7505e-01, 2.6888e-01,  9.6317e-01],
        #         [-5.2517e-01, -8.5100e-01,  9.9852e-01,  ..., -6.7518e-01, 2.6900e-01,  9.6314e-01]])

        # 注册为 buffer
        self.register_buffer(name='position_encoding', tensor=pe, persistent=True)

    def forward(self, x) -> torch.Tensor:
        T = x.size(dim=1)  # 获取 T
        print(f'T = {T}')  # 3

        # 为 True 则继续执行
        assert T <= self.max_len, '序列长度超过最大序列长度, 位置编码失败!'

        print(f'x.shape = {x.shape}')  # torch.Size([2, 3, 68])
        print(f'self.position_encoding.shape = {self.position_encoding.shape}')  # torch.Size([1024, 68])

        # 输入 x 可能只有 T=3 个 token(如 (2, 3, 68))。
        # 所以不能直接加整个 pe,而应该只取前 T 行!
        # x = x + self.position_encoding   # 错误!
        # 这里有讲究: position_encoding[: T] vs position_encoding[: T, : ]
        # 在 《`pe[: T]` vs `pe[: T, : ]`》 里有详情
        x = x + self.position_encoding[: T]  # 这里有广播机制
        return x


# 多头注意力
class MultiHeadAttn(nn.Module):
    def __init__(
            self,
            d_model: int,
            num_heads: int,
            d_k: int = None,  # query/key 的每个 head 维度
            d_v: int = None,  # value 的每个 head 维度
            dropout: float = 0.1
    ):
        super().__init__()

        # 保证词嵌入维度 能整除 头数
        # False 则报错,True 则正常运行
        # 正确约束是:当 d_k 未指定时,d_model 必须能被 num_heads 整除(因为 d_k = d_model // num_heads)
        # 一旦指定了 d_k,就没有任何整除要求!
        # d_v 同理
        if d_k is None or d_v is None:
            assert d_model % num_heads == 0, f'词嵌入维度不能整除头数'

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_k if d_k is not None else d_model // num_heads
        # 通常 d_v = d_k,但可以 d_v 不等于 d_k
        self.d_v = d_v if d_v is not None else d_model // num_heads

        print(f'd_k = {self.d_k}')  # 64
        print(f'd_v = {self.d_v}')  # 64

        # 把多个小矩阵拼接成大矩阵
        self.W_q = nn.Linear(in_features=self.d_model, out_features=self.num_heads * self.d_k, bias=False)
        self.W_k = nn.Linear(in_features=self.d_model, out_features=self.num_heads * self.d_k, bias=False)
        self.W_v = nn.Linear(in_features=self.d_model, out_features=self.num_heads * self.d_v, bias=False)

        # 在Softmax之后,在加权融合V之前
        self.dropout = nn.Dropout(p=dropout)

        # 加权融合V 得到的是 (token, h * d_v),所以这里的输入是 h * d_v
        # 输出的线性投影, 拼接后的 (h * d_v) 需要映射回 d_model
        self.W_o = nn.Linear(in_features=self.num_heads * self.d_v, out_features=self.d_model, bias=False)

    def forward(
            self,
            query_input: torch.Tensor,
            key_input: torch.Tensor,
            value_input: torch.Tensor,
            mask: torch.Tensor = None
    ):
        B, T, D = query_input.shape  # 需要保证 batch_first = True

        # 为什么形参名字叫 query_input, key_input, value_input,在《关键:分清 X 到底是什么》有详情
        # 线性变换, 矩阵是由多个heads小矩阵合并而成
        Q = self.W_q(query_input)  # (B, T, h * d_k)
        K = self.W_k(key_input)  # (B, T, h * d_k)
        V = self.W_v(value_input)  # (B, T, h * d_v)

        # 拆分为多头 Q: (B, T, h * d_k) -> (B, T, h, d_k) -> (B, h, T, d_k)
        # 为什么写 -1,详情在《拆分多头时:`Q.reshape(B, -1, self.num_heads, self.d_k)`》
        Q_heads = Q.reshape(B, -1, self.num_heads, self.d_k).transpose(1, 2)  # (B, h, T, d_k)
        K_heads = K.reshape(B, -1, self.num_heads, self.d_k).transpose(1, 2)  # (B, h, T, d_k)
        V_heads = V.reshape(B, -1, self.num_heads, self.d_v).transpose(1, 2)  # (B, h, T, d_v)

        # 计算 缩放点击注意力(不是除以 h*d_k!) (B, h, T, T)
        score = torch.matmul(Q_heads, K_heads.transpose(-1, -2)) / math.sqrt(self.d_k)

        if mask is not None:
            # masked_fill 返回新张量,不是 in_place 操作
            score = score.masked_fill(mask == 0, -torch.inf)

        # Softmax计算分布
        attn_weight = F.softmax(score, dim=-1)  # (B, h, T, T)

        # 使用 dropout
        attn_weight = self.dropout(attn_weight)

        # 加权聚合 value 得到 上下文
        context = torch.matmul(attn_weight, V_heads)  # (B, h, T, d_v)

        # 把所有 head 合并
        # (B, h, T, d_v)  -->  (B, T, h, d_v)  -->  (B, T, h * d_v)
        context = context.transpose(1, 2).reshape(B, -1, self.num_heads * self.d_v)

        # 线性投影
        output = self.W_o(context)

        return output


# 前馈全连接层
class PositionWiseFeedForwardNetwork(nn.Module):
    def __init__(
            self,
            d_model: int,
            d_ff: int = None,
            dropout: float = 0.1,
            activation='relu'
    ):
        super().__init__()
        self.d_model = d_model
        self.d_ff = d_ff if d_ff is not None else 4 * d_model  # 默认 4 倍

        self.linear1 = nn.Linear(in_features=d_model, out_features=self.d_ff)
        if activation == 'relu':
            self.activation = nn.ReLU()
        else:
            raise ValueError('还需要添加更多激活函数')
            raise ValueError(f"Unsupported activation: '{activation}'. Currently only 'relu' is supported.")
        self.dropout = nn.Dropout(p=dropout)

        self.linear2 = nn.Linear(in_features=self.d_ff, out_features=self.d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 返回值预计为 张量(但Python解释器可不管什么类型,即使不返回张量也不会报错)
        """
        :param x: x.shape = (B, T, d_model)
        :return: (B, T, d_model)
        """
        x = self.linear1(x)
        x = self.activation(x)
        x = self.dropout(x)

        x = self.linear2(x)

        # 可一行搞定
        # return self.linear2(self.dropout(self.activation(self.linear1(x))))

        return x


# 规范化层
class LayerNorm(nn.Module):
    def __init__(self, d_model: int, eps=1e-5):
        super().__init__()

        self.d_model = d_model
        self.eps = eps

        # 仿射变换 γ,用于缩放(初始为1,表示不缩放)
        self.gamma = nn.Parameter(torch.ones(size=(self.d_model,)))

        # 仿射变换 β,用于偏移(初始为0,表示不偏移)
        self.beta = nn.Parameter(torch.zeros(size=(self.d_model,)))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        支持任意形状,只要最后一维是 d_model
        例如: (B, d_model) 或 (B, T, d_model) 或 (B, T1, T2, d_model)
        """
        # 计算均值
        mean = x.mean(dim=-1, keepdim=True)

        # 计算有偏方差
        var = x.var(dim=-1, keepdim=True, unbiased=False)

        # 标准化
        x_norm = (x - mean) / (torch.sqrt(var + self.eps))

        # 仿射变换
        output = self.gamma * x_norm + self.beta

        return output


# 子层连接结构
class SublayerConnection(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1):
        super().__init__()

        # 规范化层
        self.norm = LayerNorm(d_model=d_model)

        # Dropout
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, sublayer_fn) -> torch.Tensor:
        # 绝对不能写成 x = self.norm(x), 这样会覆盖原始的 x, 残差连接还要用到原始的 x
        x_norm = self.norm(x)  # 规范化层
        sublayer_output = sublayer_fn(x_norm)  # MHSA、FNN 等
        output = x + self.dropout(sublayer_output)  # 残差连接

        # 一行搞定
        # return x + self.dropout(sublayer_fn(self.norm(x)) )

        return output


# 编码器层
class EncoderLayer(nn.Module):
    def __init__(self, d_model: int, self_attn: nn.Module, ffn: nn.Module, dropout: float = 0.1):
        """
        :param self_attn: 多头自注意力机制(MHSA)
        :param ffn: 前馈全连接(FFN)
        """
        super().__init__()

        self.d_model = d_model
        self.self_attn = self_attn
        self.ffn = ffn
        self.dropout = dropout

        # 第一个子层 —— 多头自注意力(MHSA Sub-layer)
        self.sublayer_attn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)

        # 第二个子层 —— 前馈网络(FFN Sub-layer)
        self.sublayer_ffn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)

    def forward(self, x, mask=None):
        """
        :param mask: 这里先不管 mask,后续会对 mask 进行详细的改进
        :param x: x.shape = (B, T, D)
        """
        # 第一个子层
        x1 = self.sublayer_attn(x, lambda x_norm: self.self_attn(x_norm, x_norm, x_norm, mask))

        # 第二个子层
        # 如果 self.ffn 本身就是一个接受单个张量输入并返回张量的模块(如 nn.Sequential 或自定义 FFN),那么直接传 self.ffn 即可。
        # 使用 lambda x_norm: self.ffn(x_norm) 是冗余的函数包装,增加调用开销,且无任何收益。
        x2 = self.sublayer_ffn(x1, self.ffn)

        return x2


class Encoder(nn.Module):
    def __init__(self, encoder_layer: nn.Module, num_layers: int):
        """
        :param encoder_layer: 已构建好的编码器层
        :param num_layers: 编码器的层数
        """
        super().__init__()

        # num_layer 个编码器层
        self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(num_layers)])

        # 最终输出需通过规范化层
        self.norm = LayerNorm(d_model=encoder_layer.d_model)

    def forward(self, x, mask=None):
        # 经过多个编码器层
        for layer in self.layers:
            x = layer(x, mask)

        # 最终输出经过规范化层
        output = self.norm(x)

        return output


# 解码器层
class DecoderLayer(nn.Module):
    def __init__(
            self,
            d_model: int,
            self_attn: nn.Module,  # 掩码多头自注意力(用于第一个子层)
            cross_attn: nn.Module,  # 掩码多头交叉注意力(用于第二个子层)
            ffn: nn.Module,  # 前馈全连接(用于第三个子层)
            dropout: float = 0.1
    ):
        super().__init__()

        self.d_model = d_model
        self.dropout = dropout
        self.self_attn = self_attn
        self.cross_attn = cross_attn
        self.ffn = ffn

        # 第一个子层: Masked Multi-Head Self-Attention(掩码多头自注意力)
        self.sublayer_self_attn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)

        # 第二个子层: Multi-Head Cross-Attention(编码器-解码器注意力)
        self.sublayer_cross_attn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)

        # 第三个子层: Position-wise Feed-Forward Network(逐位置前馈网络)
        self.sublayer_ffn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)

    def forward(
            self,
            x: torch.Tensor,       # 完整的已生成序列
            memory: torch.Tensor,  # 编码器最终的输出
            target_mask: torch.Tensor = None,  # 第一个子层的掩码
            memory_mask: torch.Tensor = None  # 第二个子层的掩码
    ):
        # 第一个子层
        x1 = self.sublayer_self_attn(
            x,
            lambda x_norm: self.self_attn(x_norm, x_norm, x_norm, target_mask)
        )

        # 第二个子层
        x2 = self.sublayer_cross_attn(
            x1,
            lambda x_norm: self.cross_attn(x_norm, memory, memory, memory_mask)
        )

        # 第三个子层
        x3 = self.sublayer_ffn(
            x2,
            self.ffn  # 只有一个参数,可以简写
        )

        return x3


class Decoder(nn.Module):
    def __init__(self, decoder_layer: nn.Module, num_layers: int):
        super().__init__()

        # N个解码器层
        self.layers = nn.ModuleList([
            copy.deepcopy(decoder_layer) for _ in range(num_layers)
        ])

        # 注意:Pre-LN 架构中
        # 有的地方说在经过堆叠后,需要加 LayerNorm,有的地方说不需要加
        # 所以具体加不加,看具体情况,如果遇到了,就去问领导
        # 或者查看要复现的论文原文(比如 T5、BERT、GPT)
        # 或者查看官方开源代码
        self.norm = LayerNorm(d_model=decoder_layer.d_model)

    def forward(self, x, memory, target_mask: torch.Tensor = None, memory_mask: torch.Tensor = None):
        for layer in self.layers:
            x = layer(x, memory, target_mask, memory_mask)

        # 注意:Pre-LN 架构中
        # 有的地方说在经过堆叠后,需要加 LayerNorm,有的地方说不需要加
        # 所以具体加不加,看具体情况,如果遇到了,就去问领导
        # 或者查看要复现的论文原文(比如 T5、BERT、GPT)
        # 或者查看官方开源代码
        x = self.norm(x)

        return x


# 输出部分
class Generator(nn.Module):
    def __init__(self, d_model: int, vocabulary_size: int):
        super().__init__()

        self.d_model = d_model
        self.vocabulary_size = vocabulary_size

        # 【 projection  n.投影;预测 】
        # 线性投影层:将 d_model 维度映射到 vocabulary_size
        # 不加 bias 是 Transformer 的常见做法(与原始论文一致)
        self.proj = nn.Linear(in_features=self.d_model, out_features=self.vocabulary_size, bias=False)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        logits = self.proj(x)

        return logits


class Transformer(nn.Module):
    def __init__(
            self,
            encoder: nn.Module,            # 编码器(对象)
            decoder: nn.Module,            # 解码器(对象)
            encoder_embed: nn.Module,      # 编码器的词嵌入(对象)
            decoder_embed: nn.Module,      # 解码器的词嵌入(对象)
            generator: nn.Module           # 解码器的输出部分(对象)
    ):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.encoder_embed = encoder_embed
        self.decoder_embed = decoder_embed
        self.generator = generator

    def forward(self, encoder_input, decoder_input, encoder_mask, decoder_mask):
        # encoder_input, decoder_input 都是原始输入,还没有经过词嵌入层

        # 将原始输入送入编码器输入部分: wordEmbedding + positionEmbedding
        encoder_input_embed = self.encoder_embed(encoder_input)

        # 把 encoder_input_embed 送入编码器
        encoder_output = self.encoder(encoder_input_embed, encoder_mask)

        # 将原始输入送变解码器输入部分: wordEmbedding + positionEmbedding
        decoder_input_embed = self.decoder_embed(decoder_input)

        # 把 decoder_input_embed 送入解码器
        decoder_output = self.decoder(decoder_input_embed, encoder_output, decoder_mask, encoder_mask)

        # 把解码器的输出送给解码器的输出部分
        logits = self.generator(decoder_output)

        return logits



if __name__ == '__main__':
    torch.manual_seed(66)
    encoder_vocabulary_size = 500   # 500 词库
    decoder_vocabulary_size = 1000  # 1000 词库
    d_model = 512  # 词向量维度
    max_len = 1000  # 预测序列最大长度为 1000
    num_heads = 8  # 多头注意力的头数为 8
    num_layers = 6  # 编码器 & 解码器 堆叠层数

    # 编码器: 生成 [0, 500) 范围内的随机整数
    x = torch.randint(low=0, high=encoder_vocabulary_size, size=(2, 4))  # 原始输入

    # 第一个子层————多头自注意力层(MHSA Sub-layer)
    encoder_my_attn = MultiHeadAttn(d_model=d_model, num_heads=num_heads)

    # 第二个子层————前馈网络层(FFN Sub-layer)
    encoder_my_ffn = PositionWiseFeedForwardNetwork(d_model=d_model)

    # 构建编码器层
    my_encoder_layer = EncoderLayer(d_model=d_model, self_attn=encoder_my_attn, ffn=encoder_my_ffn)

    # 构建编码器
    my_encoder = Encoder(encoder_layer=my_encoder_layer, num_layers=num_layers)

    # 解码器: 生成 [0, 500) 范围内的随机整数,这个是 Teacher Forcing,即真实值
    y = torch.randint(low=0, high=decoder_vocabulary_size, size=(2, 4))  # 原始输入

    # 解码器:第一个子层
    decoder_self_attn = MultiHeadAttn(d_model=d_model, num_heads=num_heads)

    # 解码器: 第二个子层
    decoder_cross_attn = MultiHeadAttn(d_model=d_model, num_heads=num_heads)

    # 解码器: 第三个子层
    decoder_ffn = PositionWiseFeedForwardNetwork(d_model=d_model)

    # 构建解码器层
    my_decoder_layer = DecoderLayer(
        d_model=d_model,
        self_attn=decoder_self_attn,
        cross_attn=decoder_cross_attn,
        ffn=decoder_ffn
    )

    # 构建解码器
    my_decoder = Decoder(decoder_layer=my_decoder_layer, num_layers=num_layers)

    # 解码器的输出部分
    generator = Generator(d_model=d_model, vocabulary_size=decoder_vocabulary_size)

    # 编码器 & 解码器 有重复的模块(比如 多头注意力,词嵌入层)都可以使用 copy.deepcopy
    # 注意:位置编码可以共享(因为都是位置信息),但词嵌入不能共享,编码器 和 解码器 的词表大小不一样!
    position_embed = PositionalEncoding(d_model=d_model, max_len=max_len)

    # 创建 Transformer
    transformer = Transformer(
        encoder=my_encoder,
        decoder=my_decoder,
        encoder_embed=nn.Sequential(
            Embeddings(vocabulary_size=encoder_vocabulary_size, d_model=d_model),
            position_embed
        ),
        decoder_embed=nn.Sequential(
            Embeddings(vocabulary_size=decoder_vocabulary_size, d_model=d_model),
            position_embed
        ),
        generator=generator
    )

    # 训练时使用 train,推理时使用 eval
    transformer.train()

    logits = transformer(
        encoder_input=x,
        decoder_input=y,
        encoder_mask=None,    # 作为测试,不管那么多
        decoder_mask=None     # 作为测试,不管那么多
    )

    print(f'logits.shape={logits.shape}')       # torch.Size([2, 4, 1000])
    print(transformer)
    # Transformer(
    #   (encoder): Encoder(
    #     (layers): ModuleList(
    #       (0-5): 6 x EncoderLayer(
    #         (self_attn): MultiHeadAttn(
    #           (W_q): Linear(in_features=512, out_features=512, bias=False)
    #           (W_k): Linear(in_features=512, out_features=512, bias=False)
    #           (W_v): Linear(in_features=512, out_features=512, bias=False)
    #           (dropout): Dropout(p=0.1, inplace=False)
    #           (W_o): Linear(in_features=512, out_features=512, bias=False)
    #         )
    #         (ffn): PositionWiseFeedForwardNetwork(
    #           (linear1): Linear(in_features=512, out_features=2048, bias=True)
    #           (activation): ReLU()
    #           (dropout): Dropout(p=0.1, inplace=False)
    #           (linear2): Linear(in_features=2048, out_features=512, bias=True)
    #         )
    #         (sublayer_attn): SublayerConnection(
    #           (norm): LayerNorm()
    #           (dropout): Dropout(p=0.1, inplace=False)
    #         )
    #         (sublayer_ffn): SublayerConnection(
    #           (norm): LayerNorm()
    #           (dropout): Dropout(p=0.1, inplace=False)
    #         )
    #       )
    #     )
    #     (norm): LayerNorm()
    #   )
    #   (decoder): Decoder(
    #     (layers): ModuleList(
    #       (0-5): 6 x DecoderLayer(
    #         (self_attn): MultiHeadAttn(
    #           (W_q): Linear(in_features=512, out_features=512, bias=False)
    #           (W_k): Linear(in_features=512, out_features=512, bias=False)
    #           (W_v): Linear(in_features=512, out_features=512, bias=False)
    #           (dropout): Dropout(p=0.1, inplace=False)
    #           (W_o): Linear(in_features=512, out_features=512, bias=False)
    #         )
    #         (cross_attn): MultiHeadAttn(
    #           (W_q): Linear(in_features=512, out_features=512, bias=False)
    #           (W_k): Linear(in_features=512, out_features=512, bias=False)
    #           (W_v): Linear(in_features=512, out_features=512, bias=False)
    #           (dropout): Dropout(p=0.1, inplace=False)
    #           (W_o): Linear(in_features=512, out_features=512, bias=False)
    #         )
    #         (ffn): PositionWiseFeedForwardNetwork(
    #           (linear1): Linear(in_features=512, out_features=2048, bias=True)
    #           (activation): ReLU()
    #           (dropout): Dropout(p=0.1, inplace=False)
    #           (linear2): Linear(in_features=2048, out_features=512, bias=True)
    #         )
    #         (sublayer_self_attn): SublayerConnection(
    #           (norm): LayerNorm()
    #           (dropout): Dropout(p=0.1, inplace=False)
    #         )
    #         (sublayer_cross_attn): SublayerConnection(
    #           (norm): LayerNorm()
    #           (dropout): Dropout(p=0.1, inplace=False)
    #         )
    #         (sublayer_ffn): SublayerConnection(
    #           (norm): LayerNorm()
    #           (dropout): Dropout(p=0.1, inplace=False)
    #         )
    #       )
    #     )
    #     (norm): LayerNorm()
    #   )
    #   (encoder_embed): Sequential(
    #     (0): Embeddings(
    #       (embed): Embedding(500, 512)
    #     )
    #     (1): PositionalEncoding()
    #   )
    #   (decoder_embed): Sequential(
    #     (0): Embeddings(
    #       (embed): Embedding(1000, 512)
    #     )
    #     (1): PositionalEncoding()
    #   )
    #   (generator): Generator(
    #     (proj): Linear(in_features=512, out_features=1000, bias=False)
    #   )
    # )

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐