「Transformer核心必读」从零手写所有组件,完整搭建Transformer模型
1、编码器 & 解码器 的词嵌入层通常是独立的
《英译法案例》里的编码器和解码器,它俩的词嵌入也是单独的
编码器和解码器的词嵌入层设计
- 通常情况下:各自使用独立的词嵌入层
在原始Transformer论文(《Attention Is All You Need》)和大多数标准实现中,编码器和解码器使用各自的词嵌入层,原因如下:
| 组件 | 词嵌入层 | 说明 |
|---|---|---|
| Encoder | 独立的 Embedding 层 | 处理源序列(如翻译中的源语言) |
| Decoder | 独立的 Embedding 层 | 处理目标序列(如翻译中的目标语言) |
主要原因:
- 词汇表可能不同:在机器翻译任务中,源语言和目标语言的词汇表通常不同
- 语义角色不同:编码器处理输入理解,解码器处理输出生成,语义表示空间可能有差异
- 特殊情况:可以共享词嵌入层
在以下场景中,编码器和解码器可以共享同一个词嵌入层:
- ✅ 源语言和目标语言相同的任务(如文本摘要、对话生成、语言建模)
- ✅ 词汇表完全一致的情况下
- ✅ 为了减少参数量时的优化选择
⚠️ 注意:共享嵌入层时,编码器和解码器的词汇表索引必须对齐一致。
- 重要的权重共享技巧:输出嵌入绑定
这是两个不同层面的权重共享概念,需区分清楚:
| 共享类型 | 共享对象 | 常见程度 |
|---|---|---|
| 类型A | Encoder嵌入 ↔ Decoder嵌入 | 较少见(需词汇表一致) |
| 类型B | Decoder输入嵌入 ↔ Decoder输出投影 | 非常常见(推荐做法) |
类型B(权重绑定/Weight Tying) 的具体做法:
解码器的输入嵌入层权重 = 解码器的输出投影层权重
好处:
- 减少参数量(约节省1/3嵌入相关参数)
- 提高训练稳定性
- 改善生成质量(尤其在小数据集上)
- 完整的数据流示意
┌─────────────────────────────────────────────────────────────────────┐
│ Encoder 路径 │
├─────────────────────────────────────────────────────────────────────┤
│ 输入x → [Encoder Embedding] → x_embed → [位置编码] → x_position │
│ → Encoder → 上下文表示 │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ Decoder 路径 │
├─────────────────────────────────────────────────────────────────────┤
│ 输入y → [Decoder Embedding] → y_embed → [位置编码] → y_position │
│ → Decoder → 隐藏状态 │
│ → [输出投影] → 预测 │
│ ↑ │ │
│ └──── 权重绑定(常见优化)─────┘ │
└─────────────────────────────────────────────────────────────────────┘
⚠️ 注意:权重绑定是 Decoder 内部的输入嵌入与输出投影之间,
不是 Encoder 和 Decoder 之间的共享
- 总结对比表
| 问题 | 答案 | 备注 |
|---|---|---|
| 编码器和解码器是否共享词嵌入层? | 通常不共享,各自独立 | 机器翻译任务标准做法 |
| 什么情况下可以共享? | 源目标语言相同、词汇表一致时 | 如摘要、对话等任务 |
| 有什么权重共享技巧? | Decoder输入嵌入 ↔ Decoder输出投影 | 权重绑定,强烈推荐 |
| 位置编码如何处理? | 编码器和解码器各自独立添加位置编码 | 使用相同的编码函数 |
- 实践建议
| 任务类型 | 嵌入层建议 | 权重绑定建议 |
|---|---|---|
| 机器翻译(不同语言) | Encoder/Decoder独立 | Decoder内部绑定 ✅ |
| 文本摘要 | 可共享或独立 | Decoder内部绑定 ✅ |
| 对话生成 | 可共享或独立 | Decoder内部绑定 ✅ |
| 语言模型(Decoder-only) | 单嵌入层 | 输入输出绑定 ✅ |
2、编码器 & 解码器 的位置编码通常是共享的
通常情况下,位置编码(Positional Encoding)是共享的(即使用同一套参数/公式),不需要像词嵌入层(Token Embedding)那样独立。
✅ 核心结论
-
词嵌入层 (Token Embedding):必须独立。
- 原因:编码器处理的是源语言(如中文),解码器处理的是目标语言(如英文)。它们的词汇表(Vocabulary)完全不同,索引含义也不同,所以必须用两个独立的
nn.Embedding层。
- 原因:编码器处理的是源语言(如中文),解码器处理的是目标语言(如英文)。它们的词汇表(Vocabulary)完全不同,索引含义也不同,所以必须用两个独立的
-
位置编码层 (Positional Encoding):通常共享(甚至不需要定义为
nn.Module的参数)。- 原因:位置编码通常是固定的数学公式(正弦/余弦函数)或可学习的向量,它只与“位置索引”和“维度 d m o d e l d_{model} dmodel”有关,而与“具体的语言”或“具体的词”无关。
- 无论是中文的“第 1 个字”还是英文的“第 1 个词”,它们在序列中的相对/绝对位置意义是一样的,因此可以使用完全相同的位置编码向量 P E 0 , P E 1 , … PE_0, PE_1, \dots PE0,PE1,…。
💡 为什么可以共享?
- 基于固定公式(Original Transformer)
在原始论文 Attention Is All You Need 中,位置编码是通过正弦和余弦函数计算出来的:
P E ( p o s , 2 i ) = sin ( p o s / 10000 2 i / d m o d e l ) PE_{(pos, 2i)} = \sin(pos / 10000^{2i/d_{model}}) PE(pos,2i)=sin(pos/100002i/dmodel)
P E ( p o s , 2 i + 1 ) = cos ( p o s / 10000 2 i / d m o d e l ) PE_{(pos, 2i+1)} = \cos(pos / 10000^{2i/d_{model}}) PE(pos,2i+1)=cos(pos/100002i/dmodel)
- 这个公式没有任何可学习参数。
- 只要编码器和解码器的 d m o d e l d_{model} dmodel 相同,它们生成的位置编码矩阵就是完全一样的。
- 实现方式:通常写成一个通用的工具函数或一个单独的
PositionalEncoding类, encoder 和 decoder 都调用它。
- 基于可学习向量(Learned Positional Embedding)
有些模型(如 BERT, GPT)将位置编码作为可学习的参数 nn.Embedding(max_len, d_model)。
- 即使在这种情况下,通常也是共享的。
- 原因:位置的概念(第 1 位、第 2 位…)在源语言和目标语言中是通用的。让模型去学习两套不同的“位置概念”(例如:编码器认为“位置 1”代表句首,解码器认为“位置 1”代表句中)通常没有好处,反而增加参数量且可能导致过拟合。
- 例外情况:除非你有非常特殊的理由相信源语言和目标语言的“位置结构”有本质不同(极少见),否则不要分开定义。
💻 代码实现示例
在标准的 Transformer 实现中,你会看到这样的结构:
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, ...):
super().__init__()
# 1. 词嵌入层:必须独立 (因为 vocab 不同)
self.src_embed = nn.Embedding(src_vocab_size, d_model)
self.tgt_embed = nn.Embedding(tgt_vocab_size, d_model)
# 2. 位置编码:通常共享 (只有一个实例)
# 无论是固定公式还是可学习参数,都只用一份
self.pos_encoder = PositionalEncoding(d_model, max_len=5000)
# 3. 编码器和解码器
self.encoder = Encoder(...)
self.decoder = Decoder(...)
# 初始化权重
self._init_weights()
def forward(self, src, tgt, src_mask, tgt_mask):
# --- 编码器部分 ---
# 源语言 Embedding + 共享的位置编码
src_emb = self.src_embed(src) * math.sqrt(self.d_model)
src_emb = self.pos_encoder(src_emb) # 直接加,start_idx=0
enc_output = self.encoder(src_emb, src_mask)
# --- 解码器部分 ---
# 目标语言 Embedding + 共享的位置编码
tgt_emb = self.tgt_embed(tgt) * math.sqrt(self.d_model)
# 训练时 start_idx=0; 推理时在 Decoder 内部动态计算
tgt_emb = self.pos_encoder(tgt_emb)
dec_output = self.decoder(tgt_emb, enc_output, tgt_mask, src_mask)
return dec_output
📊 总结对比表
| 组件 | 编码器 (Encoder) | 解码器 (Decoder) | 是否共享? | 原因 |
|---|---|---|---|---|
| 词嵌入 (Token Embedding) | Embedding(Vocab_src, D) |
Embedding(Vocab_tgt, D) |
❌ 不共享 | 词汇表不同,语言不同,语义空间不同。 |
| 位置编码 (Pos Encoding) | PosEnc(D) |
PosEnc(D) |
✅ 共享 | 位置是通用的数学概念(第 1 个、第 2 个…),与语言无关。 |
| 维度 ( d m o d e l d_{model} dmodel) | D D D | D D D | ✅ 必须一致 | 为了让解码器的 Cross-Attention 能接收编码器的输出。 |
⚠️ 特殊情况提示
如果你使用的是 BERT 式 的架构(Encoder-only),通常会有:
Token EmbeddingPosition Embedding(可学习,共享)Segment Embedding(用于区分句子 A 和句子 B,这是特有的)
但在标准的 Seq2Seq Transformer (Encoder-Decoder) 中,位置编码绝对是共享的。你只需要实例化一次 PositionalEncoding,然后分别在 Encoder 和 Decoder 的输入上调用它即可。
3、nn.Sequential:模型挨个执行
PyTorch nn.Sequential 完整指南
一、它是什么?
nn.Sequential 是 PyTorch torch.nn 模块中的一个容器类(Container Class)。
import torch.nn as nn
# 它本身是一个 nn.Module 的子类
print(issubclass(nn.Sequential, nn.Module)) # True
本质理解:
- 它是一个有序的模块容器
- 内部按添加顺序存储多个神经网络层
- 继承自
nn.Module,因此可以像普通层一样使用
二、它是干嘛的?
核心功能: 将多个神经网络层按顺序串联,数据依次通过每一层。
输入数据
↓
[ 第1层 ] → 输出1
↓
[ 第2层 ] → 输出2
↓
[ 第3层 ] → 输出3
↓
...
↓
[ 第N层 ] → 最终输出
关键特性:
- 自动执行前向传播(无需手动写
forward()) - 前一层输出 = 后一层输入
- 所有子模块的参数自动注册到父模块
三、它有什么用?
| 用途 | 说明 |
|---|---|
| 📦 快速搭建网络 | 几行代码构建完整模型 |
| 🔧 代码简洁 | 无需定义类和 forward 方法 |
| 📋 结构清晰 | 网络层次一目了然 |
| 🎯 易于调试 | 可按索引访问任意层 |
| 💾 方便保存 | 整体保存/加载模型参数 |
典型应用场景:
- 多层感知机(MLP)
- 卷积神经网络(CNN)的卷积块
- 自编码器(Encoder/Decoder)
- 任何纯顺序结构的网络
四、函数签名
构造方法
torch.nn.Sequential(*args)
参数说明
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
*args |
nn.Module |
否 | 无 | 可变数量的子模块,按顺序传入 |
返回值
| 返回 | 类型 | 说明 |
|---|---|---|
sequential |
nn.Sequential |
有序容器对象 |
五、参数详解与构造方式
方式1:直接传入模块(最常用)
model = nn.Sequential(
nn.Linear(784, 128), # 第0层
nn.ReLU(), # 第1层
nn.Linear(128, 10) # 第2层
)
方式2:使用 OrderedDict(可命名)
from collections import OrderedDict
model = nn.Sequential(OrderedDict([
('fc1', nn.Linear(784, 128)),
('relu1', nn.ReLU()),
('fc2', nn.Linear(128, 10))
]))
方式3:空构造 + 动态添加
model = nn.Sequential()
model.add_module('conv1', nn.Conv2d(3, 64, 3))
model.add_module('bn1', nn.BatchNorm2d(64))
model.add_module('relu1', nn.ReLU())
方式4:混合使用
# 可以先创建部分,再添加
model = nn.Sequential(nn.Linear(784, 128))
model.add_module('relu', nn.ReLU())
model.add_module('fc2', nn.Linear(128, 10))
六、常用操作
6.1 前向传播
output = model(input) # 自动按顺序执行所有层
6.2 访问子模块
# 按索引访问(方式1构造)
layer0 = model[0]
layer1 = model[1]
# 按名称访问(方式2构造)
fc1 = model.fc1
relu1 = model.relu1
# 切片访问(返回新的 Sequential)
sub_model = model[0:2] # 获取前两层
6.3 添加/插入模块
# 追加到末尾
model.add_module('new_layer', nn.Linear(10, 5))
# 插入到指定位置(PyTorch 1.10+)
model.insert(1, nn.Dropout(0.5)) # 在索引1处插入
6.4 删除模块
# 删除指定索引(✅ 正确)
del model[1]
# 删除指定名称(✅ 正确方式)
delattr(model, 'relu1') # 或 del model._modules['relu1']
6.5 遍历模块
# 遍历所有直接子模块(推荐)
for i, module in enumerate(model):
print(f"Layer {i}: {module}")
# 或
for name, module in model.named_children():
print(f"{name}: {module}")
# 遍历所有嵌套模块(包括子模块的子模块)
for name, module in model.named_modules():
print(f"{name}: {module}")
# 遍历参数
for name, param in model.named_parameters():
print(f"{name}: {param.shape}")
⚠️ 注意:
named_children()只遍历直接子模块,named_modules()会递归遍历所有嵌套模块。对于Sequential,通常使用named_children()更合适。
6.6 获取模型信息
print(model) # 打印模型结构
print(model[0].weight.shape) # 查看权重形状
print(list(model.parameters())) # 所有参数列表
print(list(model.children())) # 所有直接子模块
6.7 保存与加载
# 保存整个模型(不推荐,依赖类定义路径)
torch.save(model, 'model.pth')
# 保存参数(✅ 推荐)
torch.save(model.state_dict(), 'model_params.pth')
# 加载参数(注意设备映射)
model.load_state_dict(torch.load('model_params.pth', map_location=device))
model.eval() # 设为评估模式
6.8 设备迁移
# 移动到GPU
model = model.cuda()
# 或
model = model.to('cuda:0')
# 移动到CPU
model = model.to('cpu')
# 同时迁移模型和数据(推荐)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
6.9 训练/评估模式切换
model.train() # 训练模式(启用Dropout、BatchNorm训练统计)
model.eval() # 评估模式(关闭Dropout、使用BatchNorm运行统计)
6.10 权重初始化
def init_weights(m):
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight)
model.apply(init_weights) # 递归应用到所有子模块
七、完整示例
import torch
import torch.nn as nn
from collections import OrderedDict
# 1. 创建模型
model = nn.Sequential(OrderedDict([
('fc1', nn.Linear(784, 256)),
('bn1', nn.BatchNorm1d(256)),
('relu1', nn.ReLU()),
('drop1', nn.Dropout(0.3)),
('fc2', nn.Linear(256, 128)),
('relu2', nn.ReLU()),
('fc3', nn.Linear(128, 10))
]))
# 2. 移动到设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# 3. 初始化权重
model.apply(lambda m: nn.init.xavier_uniform_(m.weight) if isinstance(m, nn.Linear) else None)
# 4. 查看结构
print(model)
# 5. 前向传播
x = torch.randn(32, 784).to(device) # batch_size=32
output = model(x)
print(output.shape) # torch.Size([32, 10])
# 6. 访问特定层
print(model.fc1.weight.shape) # torch.Size([256, 784])
# 7. 切换模式
model.train()
# ... 训练代码 ...
model.eval()
# ... 评估代码 ...
八、注意事项 ⚠️
| 注意点 | 说明 |
|---|---|
| 🔸 维度匹配 | 确保相邻层的输入输出维度一致 |
| 🔸 顺序重要 | 模块执行顺序=添加顺序 |
| 🔸 无分支 | 无法实现多输入/多输出/跳跃连接 |
| 🔸 命名唯一 | 使用OrderedDict时,名称不能重复 |
| 🔸 模式切换 | 训练/评估前记得调用 train()/eval() |
| 🔸 设备一致 | 模型和数据必须在同一设备上 |
| 🔸 加载映射 | 加载参数时使用 map_location 避免设备错误 |
常见错误示例
# ❌ 错误:维度不匹配
model = nn.Sequential(
nn.Linear(784, 128),
nn.Linear(64, 10) # 输入应该是128!
)
# ✅ 正确
model = nn.Sequential(
nn.Linear(784, 128),
nn.Linear(128, 10)
)
# ❌ 错误:忘记激活函数
model = nn.Sequential(
nn.Linear(784, 128),
nn.Linear(128, 10) # 没有非线性
)
# ✅ 正确
model = nn.Sequential(
nn.Linear(784, 128),
nn.ReLU(),
nn.Linear(128, 10)
)
# ❌ 错误:设备不一致
model = model.cuda()
x = torch.randn(32, 784) # 在CPU上
output = model(x) # 报错!
# ✅ 正确
x = x.to(device)
output = model(x)
九、局限性说明
nn.Sequential 不适合以下场景:
| 场景 | 原因 | 替代方案 |
|---|---|---|
| 残差连接(ResNet) | 需要 output = conv(x) + x |
自定义 nn.Module |
| 多分支结构(Inception) | 需要并行处理 | 自定义 nn.Module |
| 条件执行 | 需要动态控制流程 | 自定义 nn.Module |
| 多输入/多输出 | 需要多个张量进出 | 自定义 nn.Module |
十、快速参考表
| 操作 | 代码 |
|---|---|
| 创建 | nn.Sequential(layer1, layer2, ...) |
| 前向传播 | output = model(input) |
| 按索引访问 | model[0] |
| 按名称访问 | model.layer_name |
| 切片访问 | model[0:2] |
| 添加模块 | model.add_module('name', layer) |
| 插入模块 | model.insert(idx, layer) |
| 删除模块(索引) | del model[idx] |
| 删除模块(名称) | delattr(model, 'name') |
| 遍历子模块 | for m in model: 或 model.named_children() |
| 遍历参数 | model.named_parameters() |
| 保存参数 | torch.save(model.state_dict(), path) |
| 加载参数 | model.load_state_dict(torch.load(path, map_location=device)) |
| 设备迁移 | model.to('cuda') |
| 训练模式 | model.train() |
| 评估模式 | model.eval() |
| 权重初始化 | model.apply(init_fn) |
十一、与相关容器对比
| 特性 | Sequential | ModuleList | ModuleDict |
|---|---|---|---|
| 自动 forward | ✅ 是 | ❌ 否 | ❌ 否 |
| 按索引访问 | ✅ 是 | ✅ 是 | ❌ 否 |
| 按名称访问 | ⚠️ 需OrderedDict | ❌ 否 | ✅ 是 |
| 适用场景 | 顺序网络 | 动态层数 | 命名模块管理 |
4、transformer - 完整代码
# coding: utf-8
import torch
import torch.nn as nn
import math
import torch.nn.functional as F
import copy
# 注意是 Embeddings, 不是 Embedding, 和 nn.Embedding 区分
# 仅仅是 词嵌入, 没有 位置编码
class Embeddings(nn.Module):
def __init__(self, vocabulary_size: int, d_model: int):
super().__init__()
self.vocabulary_size = vocabulary_size # 词表大小
self.d_model = d_model # 词向量维度
self.embed = nn.Embedding(num_embeddings=vocabulary_size, embedding_dim=d_model)
def forward(self, x):
# x.shape = (N, T)
# nn.Embedding 要求输入的 indices(即 x)必须是整数类型(如 torch.long 或 torch.int)
# math.sqrt 放缩的作用
x = self.embed(x) * math.sqrt(self.d_model)
return x
# 位置编码 【 Positional adj.位置的 】
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, max_len: int = 1024):
super().__init__()
assert d_model % 2 == 0, 'd_model 必须为偶数'
self.d_model = d_model
self.max_len = max_len
# max_len # 最大 token 数量
# d_model # 词向量维度
# 创建大表格
pe = torch.zeros(size=(max_len, d_model))
# 创建位置索引
position = torch.arange(start=0, end=max_len, step=1, dtype=torch.float)
print('position.shape =', position.shape) # torch.Size([1024])
print(f'position: {position}')
# tensor([0.0000e+00, 1.0000e+00, 2.0000e+00, ..., 1.0210e+03, 1.0220e+03, 1.0230e+03])
# 目的:
# pos dim_0 dim_1 dim_2 dim_3 dim_4 ... dim_d_model
# 0 0*w0 0*w1 0*w2 0*w3 0*w4 ... 0*wd_model
# 1 1*w0 1*w1 1*w2 1*w3 1*w4 ... 1*wd_model
# 2 2*w0 2*w1 2*w2 2*w3 2*w4 ... 2*wd_model
# 3 3*w0 3*w1 3*w2 3*w3 3*w4 ... 3*wd_model
# 4 4*w0 4*w1 4*w2 4*w3 4*w4 ... 4*wd_model
# ...
# max_len m*w0 m*w1 m*w2 m*w3 m*w4 ... m*wd_model
# position 是 (max_len, ) 即 [0, 1, 2, ..., max_len]
# 每行都需要 d_model 哥 0, 要变成 (max_len, d_model) 个0
# 如果变成 (1, max_len), 那即使广播,也只能变成:
# [ [0, 1, 2, ..., max_len],
# [0, 1, 2, ..., max_len],
# ...
# [0, 1, 2, ..., max_len] ]
# 所以只能变成 (max_len, 1) -> 广播后:
# [ [0], [ [0, 0, 0, ..., 0],
# [1], [0, 0, 0, ..., 0],
# [2], [0, 0, 0, ..., 0],
# [3], [0, 0, 0, ..., 0],
# ..., ...,
# [max_len] ] [0, 0, 0, ..., 0] ]
position = position.unsqueeze(dim=1)
# 计算 w = 10000 ^ (- 2i/d) = exp( 2i * (-ln(10000) / d) )
div_term = torch.exp(
torch.arange(0, d_model, 2, dtype=torch.float) *
(-math.log(10000) / d_model)
)
print('div_term.shape =', div_term.shape) # torch.Size([34])
# div_term 需要变成 每行都是 [w0, w1, w2, w3, ..., w_d_model]
# 即:
# [ [w0, w1, w2, w3, ..., w_d_model],
# [w0, w1, w2, w3, ..., w_d_model],
# ...,
# [w0, w1, w2, w3, ..., w_d_model] ]
# 所以 只能变成 (1, d_model)
# 但是 position 已经变了, (max_len, 1) 与 (d_model, ) 运算
# 可以广播, (max_len, 1) -> (max_len, d_model)
# (1, d_model) -> (max_len, d_model)
# 所以 div_term 可以不进行升维,但也可以显示升维
# product 相当于算出来了 相位, 即 sin(x) 中的 x
product = position * div_term
print('product.shape =', product.shape) # torch.Size([1024, 34])
# 万事俱备,只差 sin、cos
# sin:
# 取所有行, 每行从0开始,取到末尾,步长为 2
pe[:, 0:: 2] = torch.sin(product)
# cos:
# 取所有行, 每行从1开始,取到末尾,步长为 2
pe[:, 1:: 2] = torch.cos(product)
print('pe.shape =', pe.shape) # torch.Size([1024, 68])
print(pe) # 只要 max_len 和 d_model 确定了,那 pe 就确定了
# tensor([[ 0.0000e+00, 1.0000e+00, 0.0000e+00, ..., 1.0000e+00, 0.0000e+00, 1.0000e+00],
# [ 8.4147e-01, 5.4030e-01, 6.9087e-01, ..., 1.0000e+00, 1.3111e-04, 1.0000e+00],
# [ 9.0930e-01, -4.1615e-01, 9.9897e-01, ..., 1.0000e+00, 2.6223e-04, 1.0000e+00],
# ...,
# [ 9.9236e-01, -1.2340e-01, -1.1004e-02, ..., -6.7493e-01, 2.6875e-01, 9.6321e-01],
# [ 4.3234e-01, -9.0171e-01, 6.8216e-01, ..., -6.7505e-01, 2.6888e-01, 9.6317e-01],
# [-5.2517e-01, -8.5100e-01, 9.9852e-01, ..., -6.7518e-01, 2.6900e-01, 9.6314e-01]])
# 注册为 buffer
self.register_buffer(name='position_encoding', tensor=pe, persistent=True)
def forward(self, x) -> torch.Tensor:
T = x.size(dim=1) # 获取 T
print(f'T = {T}') # 3
# 为 True 则继续执行
assert T <= self.max_len, '序列长度超过最大序列长度, 位置编码失败!'
print(f'x.shape = {x.shape}') # torch.Size([2, 3, 68])
print(f'self.position_encoding.shape = {self.position_encoding.shape}') # torch.Size([1024, 68])
# 输入 x 可能只有 T=3 个 token(如 (2, 3, 68))。
# 所以不能直接加整个 pe,而应该只取前 T 行!
# x = x + self.position_encoding # 错误!
# 这里有讲究: position_encoding[: T] vs position_encoding[: T, : ]
# 在 《`pe[: T]` vs `pe[: T, : ]`》 里有详情
x = x + self.position_encoding[: T] # 这里有广播机制
return x
# 多头注意力
class MultiHeadAttn(nn.Module):
def __init__(
self,
d_model: int,
num_heads: int,
d_k: int = None, # query/key 的每个 head 维度
d_v: int = None, # value 的每个 head 维度
dropout: float = 0.1
):
super().__init__()
# 保证词嵌入维度 能整除 头数
# False 则报错,True 则正常运行
# 正确约束是:当 d_k 未指定时,d_model 必须能被 num_heads 整除(因为 d_k = d_model // num_heads)
# 一旦指定了 d_k,就没有任何整除要求!
# d_v 同理
if d_k is None or d_v is None:
assert d_model % num_heads == 0, f'词嵌入维度不能整除头数'
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_k if d_k is not None else d_model // num_heads
# 通常 d_v = d_k,但可以 d_v 不等于 d_k
self.d_v = d_v if d_v is not None else d_model // num_heads
print(f'd_k = {self.d_k}') # 64
print(f'd_v = {self.d_v}') # 64
# 把多个小矩阵拼接成大矩阵
self.W_q = nn.Linear(in_features=self.d_model, out_features=self.num_heads * self.d_k, bias=False)
self.W_k = nn.Linear(in_features=self.d_model, out_features=self.num_heads * self.d_k, bias=False)
self.W_v = nn.Linear(in_features=self.d_model, out_features=self.num_heads * self.d_v, bias=False)
# 在Softmax之后,在加权融合V之前
self.dropout = nn.Dropout(p=dropout)
# 加权融合V 得到的是 (token, h * d_v),所以这里的输入是 h * d_v
# 输出的线性投影, 拼接后的 (h * d_v) 需要映射回 d_model
self.W_o = nn.Linear(in_features=self.num_heads * self.d_v, out_features=self.d_model, bias=False)
def forward(
self,
query_input: torch.Tensor,
key_input: torch.Tensor,
value_input: torch.Tensor,
mask: torch.Tensor = None
):
B, T, D = query_input.shape # 需要保证 batch_first = True
# 为什么形参名字叫 query_input, key_input, value_input,在《关键:分清 X 到底是什么》有详情
# 线性变换, 矩阵是由多个heads小矩阵合并而成
Q = self.W_q(query_input) # (B, T, h * d_k)
K = self.W_k(key_input) # (B, T, h * d_k)
V = self.W_v(value_input) # (B, T, h * d_v)
# 拆分为多头 Q: (B, T, h * d_k) -> (B, T, h, d_k) -> (B, h, T, d_k)
# 为什么写 -1,详情在《拆分多头时:`Q.reshape(B, -1, self.num_heads, self.d_k)`》
Q_heads = Q.reshape(B, -1, self.num_heads, self.d_k).transpose(1, 2) # (B, h, T, d_k)
K_heads = K.reshape(B, -1, self.num_heads, self.d_k).transpose(1, 2) # (B, h, T, d_k)
V_heads = V.reshape(B, -1, self.num_heads, self.d_v).transpose(1, 2) # (B, h, T, d_v)
# 计算 缩放点击注意力(不是除以 h*d_k!) (B, h, T, T)
score = torch.matmul(Q_heads, K_heads.transpose(-1, -2)) / math.sqrt(self.d_k)
if mask is not None:
# masked_fill 返回新张量,不是 in_place 操作
score = score.masked_fill(mask == 0, -torch.inf)
# Softmax计算分布
attn_weight = F.softmax(score, dim=-1) # (B, h, T, T)
# 使用 dropout
attn_weight = self.dropout(attn_weight)
# 加权聚合 value 得到 上下文
context = torch.matmul(attn_weight, V_heads) # (B, h, T, d_v)
# 把所有 head 合并
# (B, h, T, d_v) --> (B, T, h, d_v) --> (B, T, h * d_v)
context = context.transpose(1, 2).reshape(B, -1, self.num_heads * self.d_v)
# 线性投影
output = self.W_o(context)
return output
# 前馈全连接层
class PositionWiseFeedForwardNetwork(nn.Module):
def __init__(
self,
d_model: int,
d_ff: int = None,
dropout: float = 0.1,
activation='relu'
):
super().__init__()
self.d_model = d_model
self.d_ff = d_ff if d_ff is not None else 4 * d_model # 默认 4 倍
self.linear1 = nn.Linear(in_features=d_model, out_features=self.d_ff)
if activation == 'relu':
self.activation = nn.ReLU()
else:
raise ValueError('还需要添加更多激活函数')
raise ValueError(f"Unsupported activation: '{activation}'. Currently only 'relu' is supported.")
self.dropout = nn.Dropout(p=dropout)
self.linear2 = nn.Linear(in_features=self.d_ff, out_features=self.d_model)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# 返回值预计为 张量(但Python解释器可不管什么类型,即使不返回张量也不会报错)
"""
:param x: x.shape = (B, T, d_model)
:return: (B, T, d_model)
"""
x = self.linear1(x)
x = self.activation(x)
x = self.dropout(x)
x = self.linear2(x)
# 可一行搞定
# return self.linear2(self.dropout(self.activation(self.linear1(x))))
return x
# 规范化层
class LayerNorm(nn.Module):
def __init__(self, d_model: int, eps=1e-5):
super().__init__()
self.d_model = d_model
self.eps = eps
# 仿射变换 γ,用于缩放(初始为1,表示不缩放)
self.gamma = nn.Parameter(torch.ones(size=(self.d_model,)))
# 仿射变换 β,用于偏移(初始为0,表示不偏移)
self.beta = nn.Parameter(torch.zeros(size=(self.d_model,)))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
支持任意形状,只要最后一维是 d_model
例如: (B, d_model) 或 (B, T, d_model) 或 (B, T1, T2, d_model)
"""
# 计算均值
mean = x.mean(dim=-1, keepdim=True)
# 计算有偏方差
var = x.var(dim=-1, keepdim=True, unbiased=False)
# 标准化
x_norm = (x - mean) / (torch.sqrt(var + self.eps))
# 仿射变换
output = self.gamma * x_norm + self.beta
return output
# 子层连接结构
class SublayerConnection(nn.Module):
def __init__(self, d_model: int, dropout: float = 0.1):
super().__init__()
# 规范化层
self.norm = LayerNorm(d_model=d_model)
# Dropout
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, sublayer_fn) -> torch.Tensor:
# 绝对不能写成 x = self.norm(x), 这样会覆盖原始的 x, 残差连接还要用到原始的 x
x_norm = self.norm(x) # 规范化层
sublayer_output = sublayer_fn(x_norm) # MHSA、FNN 等
output = x + self.dropout(sublayer_output) # 残差连接
# 一行搞定
# return x + self.dropout(sublayer_fn(self.norm(x)) )
return output
# 编码器层
class EncoderLayer(nn.Module):
def __init__(self, d_model: int, self_attn: nn.Module, ffn: nn.Module, dropout: float = 0.1):
"""
:param self_attn: 多头自注意力机制(MHSA)
:param ffn: 前馈全连接(FFN)
"""
super().__init__()
self.d_model = d_model
self.self_attn = self_attn
self.ffn = ffn
self.dropout = dropout
# 第一个子层 —— 多头自注意力(MHSA Sub-layer)
self.sublayer_attn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)
# 第二个子层 —— 前馈网络(FFN Sub-layer)
self.sublayer_ffn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)
def forward(self, x, mask=None):
"""
:param mask: 这里先不管 mask,后续会对 mask 进行详细的改进
:param x: x.shape = (B, T, D)
"""
# 第一个子层
x1 = self.sublayer_attn(x, lambda x_norm: self.self_attn(x_norm, x_norm, x_norm, mask))
# 第二个子层
# 如果 self.ffn 本身就是一个接受单个张量输入并返回张量的模块(如 nn.Sequential 或自定义 FFN),那么直接传 self.ffn 即可。
# 使用 lambda x_norm: self.ffn(x_norm) 是冗余的函数包装,增加调用开销,且无任何收益。
x2 = self.sublayer_ffn(x1, self.ffn)
return x2
class Encoder(nn.Module):
def __init__(self, encoder_layer: nn.Module, num_layers: int):
"""
:param encoder_layer: 已构建好的编码器层
:param num_layers: 编码器的层数
"""
super().__init__()
# num_layer 个编码器层
self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(num_layers)])
# 最终输出需通过规范化层
self.norm = LayerNorm(d_model=encoder_layer.d_model)
def forward(self, x, mask=None):
# 经过多个编码器层
for layer in self.layers:
x = layer(x, mask)
# 最终输出经过规范化层
output = self.norm(x)
return output
# 解码器层
class DecoderLayer(nn.Module):
def __init__(
self,
d_model: int,
self_attn: nn.Module, # 掩码多头自注意力(用于第一个子层)
cross_attn: nn.Module, # 掩码多头交叉注意力(用于第二个子层)
ffn: nn.Module, # 前馈全连接(用于第三个子层)
dropout: float = 0.1
):
super().__init__()
self.d_model = d_model
self.dropout = dropout
self.self_attn = self_attn
self.cross_attn = cross_attn
self.ffn = ffn
# 第一个子层: Masked Multi-Head Self-Attention(掩码多头自注意力)
self.sublayer_self_attn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)
# 第二个子层: Multi-Head Cross-Attention(编码器-解码器注意力)
self.sublayer_cross_attn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)
# 第三个子层: Position-wise Feed-Forward Network(逐位置前馈网络)
self.sublayer_ffn = SublayerConnection(d_model=self.d_model, dropout=self.dropout)
def forward(
self,
x: torch.Tensor, # 完整的已生成序列
memory: torch.Tensor, # 编码器最终的输出
target_mask: torch.Tensor = None, # 第一个子层的掩码
memory_mask: torch.Tensor = None # 第二个子层的掩码
):
# 第一个子层
x1 = self.sublayer_self_attn(
x,
lambda x_norm: self.self_attn(x_norm, x_norm, x_norm, target_mask)
)
# 第二个子层
x2 = self.sublayer_cross_attn(
x1,
lambda x_norm: self.cross_attn(x_norm, memory, memory, memory_mask)
)
# 第三个子层
x3 = self.sublayer_ffn(
x2,
self.ffn # 只有一个参数,可以简写
)
return x3
class Decoder(nn.Module):
def __init__(self, decoder_layer: nn.Module, num_layers: int):
super().__init__()
# N个解码器层
self.layers = nn.ModuleList([
copy.deepcopy(decoder_layer) for _ in range(num_layers)
])
# 注意:Pre-LN 架构中
# 有的地方说在经过堆叠后,需要加 LayerNorm,有的地方说不需要加
# 所以具体加不加,看具体情况,如果遇到了,就去问领导
# 或者查看要复现的论文原文(比如 T5、BERT、GPT)
# 或者查看官方开源代码
self.norm = LayerNorm(d_model=decoder_layer.d_model)
def forward(self, x, memory, target_mask: torch.Tensor = None, memory_mask: torch.Tensor = None):
for layer in self.layers:
x = layer(x, memory, target_mask, memory_mask)
# 注意:Pre-LN 架构中
# 有的地方说在经过堆叠后,需要加 LayerNorm,有的地方说不需要加
# 所以具体加不加,看具体情况,如果遇到了,就去问领导
# 或者查看要复现的论文原文(比如 T5、BERT、GPT)
# 或者查看官方开源代码
x = self.norm(x)
return x
# 输出部分
class Generator(nn.Module):
def __init__(self, d_model: int, vocabulary_size: int):
super().__init__()
self.d_model = d_model
self.vocabulary_size = vocabulary_size
# 【 projection n.投影;预测 】
# 线性投影层:将 d_model 维度映射到 vocabulary_size
# 不加 bias 是 Transformer 的常见做法(与原始论文一致)
self.proj = nn.Linear(in_features=self.d_model, out_features=self.vocabulary_size, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
logits = self.proj(x)
return logits
class Transformer(nn.Module):
def __init__(
self,
encoder: nn.Module, # 编码器(对象)
decoder: nn.Module, # 解码器(对象)
encoder_embed: nn.Module, # 编码器的词嵌入(对象)
decoder_embed: nn.Module, # 解码器的词嵌入(对象)
generator: nn.Module # 解码器的输出部分(对象)
):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.encoder_embed = encoder_embed
self.decoder_embed = decoder_embed
self.generator = generator
def forward(self, encoder_input, decoder_input, encoder_mask, decoder_mask):
# encoder_input, decoder_input 都是原始输入,还没有经过词嵌入层
# 将原始输入送入编码器输入部分: wordEmbedding + positionEmbedding
encoder_input_embed = self.encoder_embed(encoder_input)
# 把 encoder_input_embed 送入编码器
encoder_output = self.encoder(encoder_input_embed, encoder_mask)
# 将原始输入送变解码器输入部分: wordEmbedding + positionEmbedding
decoder_input_embed = self.decoder_embed(decoder_input)
# 把 decoder_input_embed 送入解码器
decoder_output = self.decoder(decoder_input_embed, encoder_output, decoder_mask, encoder_mask)
# 把解码器的输出送给解码器的输出部分
logits = self.generator(decoder_output)
return logits
if __name__ == '__main__':
torch.manual_seed(66)
encoder_vocabulary_size = 500 # 500 词库
decoder_vocabulary_size = 1000 # 1000 词库
d_model = 512 # 词向量维度
max_len = 1000 # 预测序列最大长度为 1000
num_heads = 8 # 多头注意力的头数为 8
num_layers = 6 # 编码器 & 解码器 堆叠层数
# 编码器: 生成 [0, 500) 范围内的随机整数
x = torch.randint(low=0, high=encoder_vocabulary_size, size=(2, 4)) # 原始输入
# 第一个子层————多头自注意力层(MHSA Sub-layer)
encoder_my_attn = MultiHeadAttn(d_model=d_model, num_heads=num_heads)
# 第二个子层————前馈网络层(FFN Sub-layer)
encoder_my_ffn = PositionWiseFeedForwardNetwork(d_model=d_model)
# 构建编码器层
my_encoder_layer = EncoderLayer(d_model=d_model, self_attn=encoder_my_attn, ffn=encoder_my_ffn)
# 构建编码器
my_encoder = Encoder(encoder_layer=my_encoder_layer, num_layers=num_layers)
# 解码器: 生成 [0, 500) 范围内的随机整数,这个是 Teacher Forcing,即真实值
y = torch.randint(low=0, high=decoder_vocabulary_size, size=(2, 4)) # 原始输入
# 解码器:第一个子层
decoder_self_attn = MultiHeadAttn(d_model=d_model, num_heads=num_heads)
# 解码器: 第二个子层
decoder_cross_attn = MultiHeadAttn(d_model=d_model, num_heads=num_heads)
# 解码器: 第三个子层
decoder_ffn = PositionWiseFeedForwardNetwork(d_model=d_model)
# 构建解码器层
my_decoder_layer = DecoderLayer(
d_model=d_model,
self_attn=decoder_self_attn,
cross_attn=decoder_cross_attn,
ffn=decoder_ffn
)
# 构建解码器
my_decoder = Decoder(decoder_layer=my_decoder_layer, num_layers=num_layers)
# 解码器的输出部分
generator = Generator(d_model=d_model, vocabulary_size=decoder_vocabulary_size)
# 编码器 & 解码器 有重复的模块(比如 多头注意力,词嵌入层)都可以使用 copy.deepcopy
# 注意:位置编码可以共享(因为都是位置信息),但词嵌入不能共享,编码器 和 解码器 的词表大小不一样!
position_embed = PositionalEncoding(d_model=d_model, max_len=max_len)
# 创建 Transformer
transformer = Transformer(
encoder=my_encoder,
decoder=my_decoder,
encoder_embed=nn.Sequential(
Embeddings(vocabulary_size=encoder_vocabulary_size, d_model=d_model),
position_embed
),
decoder_embed=nn.Sequential(
Embeddings(vocabulary_size=decoder_vocabulary_size, d_model=d_model),
position_embed
),
generator=generator
)
# 训练时使用 train,推理时使用 eval
transformer.train()
logits = transformer(
encoder_input=x,
decoder_input=y,
encoder_mask=None, # 作为测试,不管那么多
decoder_mask=None # 作为测试,不管那么多
)
print(f'logits.shape={logits.shape}') # torch.Size([2, 4, 1000])
print(transformer)
# Transformer(
# (encoder): Encoder(
# (layers): ModuleList(
# (0-5): 6 x EncoderLayer(
# (self_attn): MultiHeadAttn(
# (W_q): Linear(in_features=512, out_features=512, bias=False)
# (W_k): Linear(in_features=512, out_features=512, bias=False)
# (W_v): Linear(in_features=512, out_features=512, bias=False)
# (dropout): Dropout(p=0.1, inplace=False)
# (W_o): Linear(in_features=512, out_features=512, bias=False)
# )
# (ffn): PositionWiseFeedForwardNetwork(
# (linear1): Linear(in_features=512, out_features=2048, bias=True)
# (activation): ReLU()
# (dropout): Dropout(p=0.1, inplace=False)
# (linear2): Linear(in_features=2048, out_features=512, bias=True)
# )
# (sublayer_attn): SublayerConnection(
# (norm): LayerNorm()
# (dropout): Dropout(p=0.1, inplace=False)
# )
# (sublayer_ffn): SublayerConnection(
# (norm): LayerNorm()
# (dropout): Dropout(p=0.1, inplace=False)
# )
# )
# )
# (norm): LayerNorm()
# )
# (decoder): Decoder(
# (layers): ModuleList(
# (0-5): 6 x DecoderLayer(
# (self_attn): MultiHeadAttn(
# (W_q): Linear(in_features=512, out_features=512, bias=False)
# (W_k): Linear(in_features=512, out_features=512, bias=False)
# (W_v): Linear(in_features=512, out_features=512, bias=False)
# (dropout): Dropout(p=0.1, inplace=False)
# (W_o): Linear(in_features=512, out_features=512, bias=False)
# )
# (cross_attn): MultiHeadAttn(
# (W_q): Linear(in_features=512, out_features=512, bias=False)
# (W_k): Linear(in_features=512, out_features=512, bias=False)
# (W_v): Linear(in_features=512, out_features=512, bias=False)
# (dropout): Dropout(p=0.1, inplace=False)
# (W_o): Linear(in_features=512, out_features=512, bias=False)
# )
# (ffn): PositionWiseFeedForwardNetwork(
# (linear1): Linear(in_features=512, out_features=2048, bias=True)
# (activation): ReLU()
# (dropout): Dropout(p=0.1, inplace=False)
# (linear2): Linear(in_features=2048, out_features=512, bias=True)
# )
# (sublayer_self_attn): SublayerConnection(
# (norm): LayerNorm()
# (dropout): Dropout(p=0.1, inplace=False)
# )
# (sublayer_cross_attn): SublayerConnection(
# (norm): LayerNorm()
# (dropout): Dropout(p=0.1, inplace=False)
# )
# (sublayer_ffn): SublayerConnection(
# (norm): LayerNorm()
# (dropout): Dropout(p=0.1, inplace=False)
# )
# )
# )
# (norm): LayerNorm()
# )
# (encoder_embed): Sequential(
# (0): Embeddings(
# (embed): Embedding(500, 512)
# )
# (1): PositionalEncoding()
# )
# (decoder_embed): Sequential(
# (0): Embeddings(
# (embed): Embedding(1000, 512)
# )
# (1): PositionalEncoding()
# )
# (generator): Generator(
# (proj): Linear(in_features=512, out_features=1000, bias=False)
# )
# )
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)