目录

一、Transformer

1、整体结构

2、编码器

2.1、自注意力子层

(1)生成Q,K,V向量

(2)计算位置相关性

(3)计算注意力权重

(4)加权汇总生成输出

(5)总结

(6)多头注意力计算过程

2.2、前馈神经网络子层

2.3、残差连接与层归一化

(1)残差连接(Residual Connection)

(2)层归一化(Layer Normalization)

2.4、位置编码

3、解码器

3.1、Masked自注意力子层

3.2、编码器--解码器注意力子层

3.3、前馈神经网络子层

4、模型训练和推理机制

4.1、模型训练

4.2、模型推理

5、API使用

1、Transformer构造参数

2、Transformer.forward

3、Transformer.encoder

4、Test

6、Transformer中英翻译案例

model

train

predict

一、Transformer

此前的Seq2Seq模型通过Attention机制取得了一定提升,但由于整体结构仍依赖RNN,依然存在

计算效率低、难以建模长距离依赖等结构性限制。Transformer完全摒弃了RNN结构,转而使用注

意力机制直接建模序列中各位置之间的关系。

1、整体结构

与基于RNN的Seq2Seq模型一样,Transformer的解码器采用自回归方式生成目标序列。不同之处

在于,每一步的输入是此前已生成的全部词,模型会输出一个与输入长度相同的序列,但我们只取

最后一个位置的结果作为当前预测。这个过程不断重复,直到生成结束标记<eos>。

此外,Transformer的编码器和解码器模块分别由多个结构相同的层堆叠而成。通过层层堆叠,模

型能够逐步提取更深层次的语义特征,从而增强对复杂语言现象的建模能力。标准的Transformer

模型通常包含6个编码器层和6个解码器层。

2、编码器

每个Encoder Layer都包含两个子层(sublayer)自注意力子层(Self-Attention Sublayer)和前馈

神经网络子层(Feed-Forward Sublayer)。

2.1、自注意力子层

在序列内部建立各位置之间的依赖关系,使模型能够为每个位置生成融合全局信息的表示。

(1)生成Q,K,V向量

总结:Q发起匹配,K与Q匹配,V加权求和

(2)计算位置相关性

评分函数采用向量点积形式。由于在高维空间中,点积的数值可能过大,会影响softmax的稳定

性,因此在实际计算中对结果进行了缩放。最终的评分函数为:

其中𝑑𝑘是key向量的维度,用于缩放点积的幅度。这个分数越大,表示第i个位置越应该关注第j个

位置的信息。

(3)计算注意力权重

(4)加权汇总生成输出

(5)总结

整个自注意力机制的完整计算公式如下:

(6)多头注意力计算过程

要准确理解语义复杂的句子,Transformer引入了多头注意力机制(Multi-Head Attention)。其核

心思想是通过多组独立的Query、Key、Value投影,让不同注意力头分别专注于不同的语义关系,

最后将各头的输出拼接融合。

分别计算多头注意力输出

合并多头注意力

2.2、前馈神经网络子层

前馈神经网络(Feed-Forward Network,简称FFN),一个标准的FFN子层包含两个线性变换和一

个非线性激活函数,中间通常使用ReLU激活。

2.3、残差连接与层归一化

在Transformer的每个编码器层中,每个子层,包括自注意力子层和前馈神经网络子层,其输出都

要经过残差连接(Residual Connection)和层归一化(Layer Normalization)处理。这两者是深层

神经网络中常用的结构,用于缓解模型训练中的梯度消失、收敛困难等问题。 

(1)残差连接(Residual Connection)

将子层的输入直接与其输出相加,形成一条跨越子层的“捷径”,其数学形式为:

(2)层归一化(Layer Normalization)

主要作用是规范输入序列中每个token的特征分布(某个token的表示可能在不同维度上有较大数值

差异),提升模型训练的稳定性。该操作会将每个token的向量调整为均值为0、方差为1的规范分

布。

2.4、位置编码

为了解决 Transformer 无法捕捉语序的问题,该模型引入了位置编码(Positional Encoding)机

制,通过为每个词添加位置向量,使其在获取词义的同时也能感知位置信息,从而理解语序。

为解决绝对位置编码数值倾斜及归一化导致的位置不一致问题,Transformer采用基于正弦和余弦

函数的固定位置编码,为每个位置生成唯一且与句子长度无关的向量,从而保证模型能稳定捕捉语

序信息。

3、解码器

        每个Decoder Layer都包含三个子层,分别是Masked自注意力子层、编码器-解码器注意力子

层(Encoder-Decoder Attention)和前馈神经网络子层(Feed-Forward Network),每个子层后

也都配有残差连接与层归一化(Layer Normalization),结构设计与编码器保持一致,确保训练的

稳定性和效率。

        此外,解码器在输入端同样需要加入位置编码(Positional Encoding),用于提供序列中的位

置信息,其计算方式与编码器中相同。

3.1、Masked自注意力子层

用于建模当前位置与前文词之间的依赖关系。为了在训练时模拟逐词生成的过程,引入遮盖机制

(Mask),限制每个位置只能关注它前面的词。

Mask是一个下三角矩阵,上三角设置为负无穷的原因是让上三角部分经过softmax之后,权重几乎

为0。

3.2、编码器--解码器注意力子层

在解码器的交叉注意力中,Query来自解码器当前输入,Key和Value来自编码器输出。通过计算

Query与所有Key的相似度,得到源序列各位置的权重,再对Value加权求和,从而为当前生成词提

取相关的上下文信息。

3.3、前馈神经网络子层

与编码器中结构完全一致,对每个位置的表示进行非线性变换,增强模型的表达能力。

4、模型训练和推理机制

4.1、模型训练

训练时,Transformer将目标序列整体输入解码器,并在每个位置同时进行预测。为防止模型“看到”

后面的词,破坏因果顺序,解码器在自注意力机子层中引入了遮盖机制(Mask),限制每个位置

只能关注它前面的词。

4.2、模型推理

推理时,每一步都要重新输入整个已生成序列,模型需要基于全量前文重新计算注意力分布,决定

下一个词的输出。整个过程必须顺序执行,无法并行。推理阶段,模型每一步都要重新输入当前已

生成的全部词,通过自注意力机制建模上下文关系,预测下一个词。

5、API使用

PyTorch提供了完整的Transformer官方实现,封装了编码器-解码器结构,适用于机器翻译、文本生成等序列建模任务。核心模块包括:

  • nn.Transformer:顶层接口,封装完整编码器-解码器架构,支持自定义层数、注意力头数、隐藏维度等参数。

  • nn.TransformerEncoder / Decoder:分别由多个编码器/解码器层堆叠而成,用于序列编码和目标序列生成。

  • nn.TransformerEncoderLayer / DecoderLayer:实现单层结构,编码器层包含多头自注意力和前馈子层;解码器层额外增加编码器-解码器注意力。各子层均配有残差连接和层归一化。

1、Transformer构造参数

2、Transformer.forward

nn.Transformer封装了完整的前向传播逻辑,其forward()方法接收源语言序列(编码器输入)和目

标语言序列(解码器输入),返回解码器的预测结果。

3、Transformer.encoder

nn.Transformer通过encoder属性(nn.TransformerEncoder实例)对源序列进行编码,提取上下文

相关的语义表示。

4、Test

import torch.nn as nn
import torch

model = nn.Transformer(d_model = 64,nhead = 8,num_encoder_layers = 3,num_decoder_layers = 3,
                       dim_feedforward = 256,batch_first=True)
print(model)

src = torch.randn(32,10,64)
tgt = torch.randn(32,24,64)
output1 = model(src,tgt)
print(output1.shape )

memory = model.encoder(src)
print(memory.shape)
output2 = model.decoder(tgt,memory)
print(output2.shape)

Transformer(
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-2): 3 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
        )
        (linear1): Linear(in_features=64, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=256, out_features=64, bias=True)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
    (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  )
  (decoder): TransformerDecoder(
    (layers): ModuleList(
      (0-2): 3 x TransformerDecoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
        )
        (multihead_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
        )
        (linear1): Linear(in_features=64, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=256, out_features=64, bias=True)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm3): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
        (dropout3): Dropout(p=0.1, inplace=False)
      )
    )
    (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  )
)
torch.Size([32, 24, 64])
torch.Size([32, 10, 64])
torch.Size([32, 24, 64])

6、Transformer中英翻译案例

PyTorch无内置位置编码,而Transformer不具备位置感知能力,故需手动实现位置编码,与嵌入层输出相加后输入模型。还需实现以下模块:

  • 源语言和目标语言的词嵌入层(nn.Embedding)

  • 输出层(nn.Linear),将模型输出映射至目标词表大小

model

import torch
import torch.nn as nn
from config import *
import math

# # 自定义位置编码层
# class PositionEncoding(nn.Module):
#     def __init__(self,max_len,d_model):
#         super().__init__()
#         # 定义编码矩阵
#         pe = torch.zeros(size = (max_len,d_model),dtype=torch.float)
#         # 遍历每一行
#         for pos in range(max_len):
#             # 遍历当前位置向量的每个特征,步长为2
#             for _2i in range(0,d_model,2):
#                 # 按公式计算向量里的这两个特征
#                 pe[pos,_2i] = math.sin(pos/(10000 ** (_2i/d_model)))
#                 pe[pos,_2i+1] = math.cos(pos/(10000 ** (_2i/d_model)))
#         # 注册缓冲
#         self.register_buffer('pe', pe)
#
#     def forward(self, x):
#         seq_len = x.size(1) # 提取当前序列长度
#         # 在位置编码矩阵中截取L个向量
#         part_pe = self.pe[0:seq_len]
#         return x + part_pe

    # 自定义位置编码层
class PositionEncoding_Pro(nn.Module):
        def __init__(self, max_len, d_model):
            super().__init__()
            # 定义编码矩阵
            pe = torch.zeros(size=(max_len, d_model))
            pos = torch.arange(0,max_len).unsqueeze(1)
            _2i = torch.arange(0,d_model,2)

            div_term = torch.pow(10000,(_2i/d_model))

            pe[:,0::2] = torch.sin(pos/div_term)
            pe[:,1::2] = torch.cos(pos/div_term)

            self.register_buffer('pe', pe)

        def forward(self, x):
            seq_len = x.size(1)  # 提取当前序列长度
            # 在位置编码矩阵中截取L个向量
            part_pe = self.pe[0:seq_len]
            return x + part_pe


class TranslationModel(nn.Module):
    def __init__(self,cn_vocab_size,en_vocab_size,cn_padding_idx,en_padding_idx):
        super().__init__()
        self.cn_embedding = nn.Embedding(cn_vocab_size, embedding_dim=DIM_MODEL, padding_idx=cn_padding_idx)
        self.en_embedding = nn.Embedding(en_vocab_size, embedding_dim=DIM_MODEL, padding_idx=en_padding_idx)
        # 位置编码
        self.position_encoding = PositionEncoding_Pro(SEQ_LEN, DIM_MODEL)
        self.transformer = nn.Transformer(d_model=DIM_MODEL,nhead = NUM_HEADS,
                                          num_encoder_layers=NUM_ENCODER_LAYERS,num_decoder_layers=NUM_DECODER_LAYERS,
                                          batch_first=True)
        self.linear = nn.Linear(in_features=DIM_MODEL,out_features=en_vocab_size)


    def forward(self,src,tgt,src_pad_mask,tgt_mask):
        # 编码
        memory = self.encode(src,src_pad_mask)
        # 解码
        output = self.decode(tgt,memory,tgt_mask = tgt_mask,memory_pad_mask = src_pad_mask)
        return output

    def encode(self,src,src_pad_mask):
        embed = self.cn_embedding(src)
        input = self.position_encoding(embed)
        memory = self.transformer.encoder(src = input,src_key_padding_mask = src_pad_mask)
        return memory

    def decode(self,tgt,memory,tgt_mask=None,memory_pad_mask=None):
        embed = self.en_embedding(tgt)
        input = self.position_encoding(embed)
        output = self.transformer.decoder(tgt=input, memory = memory,tgt_mask=tgt_mask,memory_key_padding_mask=memory_pad_mask)
        output = self.linear(output)
        return output


if __name__ == "__main__":
    model = TranslationModel(1000,1024,0,0)
    print(model)

train

import torch
from torch import nn, optim
from config import *
from dataset import get_dataloader
from model import TranslationModel
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter # 日志写入器
import time # 时间库
from tokenizer import ChineseTokenizer,EnglishTokenizer

# 定义训练引擎函数,训练一个epoch,返回平均损失
def train_one_epoch(model, train_loader, loss,optimizer,device):
    model.train()

    total_loss = 0
    for inputs, targets in tqdm(train_loader,desc='训练:'):
        inputs, targets = inputs.to(device), targets.to(device) # 形状(N,L)
        # 准备参数
        decoder_inputs = targets[:,:-1]
        decoder_targets = targets[:,1:]

        src_pad_mask = (inputs == model.cn_embedding.padding_idx)
        tgt_mask = model.transformer.generate_square_subsequent_mask(decoder_inputs.shape[1])

        # 解码器前向传播得到解码输出
        decoder_outputs = model(src = inputs,tgt = decoder_inputs,src_pad_mask = src_pad_mask,tgt_mask = tgt_mask)

        loss_value = loss(decoder_outputs.transpose(1,2), decoder_targets)
        loss_value.backward()
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss_value.item()
    return total_loss / len(train_loader)

def train():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    train_loader = get_dataloader(train = True)

    cn_tokenizer = ChineseTokenizer.from_vocab(MODEL_DIR/CN_VOCAB_FILE)
    en_tokenizer = EnglishTokenizer.from_vocab(MODEL_DIR/EN_VOCAB_FILE)

    model = TranslationModel(cn_tokenizer.vocab_size,en_tokenizer.vocab_size,cn_tokenizer.pad_id,en_tokenizer.pad_id).to(device)

    # with open(MODEL_DIR/VOCAB_FILE,'r',encoding='utf-8') as f:
    #     vocab_list = [token.strip() for token in f.readlines()]
    #
    # model = InputMethodModel(vocab_size = len(vocab_list)).to(device)

    loss = nn.CrossEntropyLoss(ignore_index=en_tokenizer.pad_id)
    optimizer = optim.Adam(model.parameters(),lr = LEARNING_RATE)

    writer = SummaryWriter(log_dir=LOG_DIR / time.strftime("%Y-%m-%d_%H-%M-%S"))

    min_loss = float('inf')
    for epoch in range(EPOCHS):
        print('='*10,f'EPOCH:{epoch+1}','='*10)
        this_loss = train_one_epoch(model, train_loader, loss,optimizer,device)
        print("本轮训练损失:",this_loss)

        writer.add_scalar('loss',this_loss,epoch+1)

        if this_loss < min_loss:
            min_loss = this_loss
            torch.save(model.state_dict(),MODEL_DIR/BEST_MODEL)
            print('模型保存成功!')

    writer.close()

if __name__ == '__main__':
    train()

predict

import torch
from config import *
from model import TranslationModel
from tokenizer import ChineseTokenizer,EnglishTokenizer

def predict_batch(model,inputs,tokenizer,device):
    model.eval()
    with torch.no_grad():
        # 定义当前batchsize
        batch_size = inputs.shape[0]

        src_pad_mask = (inputs == model.cn_embedding.padding_idx)
        memory = model.encode(inputs,src_pad_mask)

        # 构建第一个时间步的输入,长度为N的向量,内容全部为<sos>id
        decoder_input = torch.full(size = (batch_size,1),fill_value = tokenizer.start_id).to(device)

        # 生成id列表
        generated_ids = []
        # 定义一个长度为N的tensor,保存每个数据样本是否已生成结束标志<eos>
        is_finished = torch.full(size = [batch_size],fill_value = False).to(device)
        # 循环迭代自回归生成
        for i in range(SEQ_LEN):
            tgt_mask = model.transformer.generate_square_subsequent_mask(decoder_input.shape[1])
            decoder_output = model.decode(decoder_input, memory,tgt_mask = tgt_mask, memory_pad_mask = src_pad_mask)
            # 词选择策略,贪心解码
            next_token_ids = torch.argmax(decoder_output[:,-1,:], dim = -1,keepdim = True)
            # 保存预测id到生成列表中
            generated_ids.append(next_token_ids)
            # 更新输入
            decoder_input = torch.cat((decoder_input,next_token_ids),dim = -1)
            # 判断是否生成结束标志<eos>,如果一批全部生成<eos>则退出循环
            is_finished |= (next_token_ids.squeeze(1) == tokenizer.end_id)
            if is_finished.all():
                break

    # 处理生成结果
    # 基于生成列表 generated_ids:[tensor(N,1),tensor(N,1),...]
    # 将列表转成(N,L)张量
    generated_tensor = torch.cat(generated_ids,dim= 1)
    # 转换为二维列表
    generated_list = generated_tensor.tolist()
    # 去掉每个元素(句子的id列表)中eos之后的所有内容
    for i,sentence_ids in enumerate(generated_list):
        if tokenizer.end_id in sentence_ids:
            eos_pos = sentence_ids.index(tokenizer.end_id)
            generated_list[i] = sentence_ids[:eos_pos]

    return generated_list # 转换成列表返回

# def predict(text,model,id2word,word2id,k,device):
#     tokens = jieba.cut(text)
#     ids = [word2id.get(token, word2id.get(UNK_TOKEN)) for token in tokens]
def predict(text, model, cn_tokenizer, en_tokenizer,device):
    ids = cn_tokenizer.encode(text)
    input = torch.tensor([ids], dtype=torch.long).to(device)
    result = predict_batch(model,input,en_tokenizer,device)

    return en_tokenizer.decode(result[0])

def run_predict():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    cn_tokenizer = ChineseTokenizer.from_vocab(MODEL_DIR / CN_VOCAB_FILE)
    en_tokenizer = EnglishTokenizer.from_vocab(MODEL_DIR / EN_VOCAB_FILE)
    # with open(MODEL_DIR/VOCAB_FILE,'r',encoding='utf-8') as f:
    #     vocab_list = [token.strip() for token in f.readlines()]
    # id2word = {id:word for id,word in enumerate(vocab_list)}
    # word2id = {word:id for id,word in enumerate(vocab_list)}
    print("词表加载成功!")


    model = TranslationModel(cn_tokenizer.vocab_size,en_tokenizer.vocab_size,cn_tokenizer.pad_id,en_tokenizer.pad_id).to(device)
    # model = InputMethodModel(vocab_size = len(id2word)).to(device)
    model.load_state_dict(torch.load(MODEL_DIR/BEST_MODEL))
    print("模型加载成功!")

    print('欢迎使用中英翻译模型!输入q或者quit退出...')

    while True: # 核心,一个死循环
        user_input = input('中文 > ')
        # 判断如果是q或quit直接退出
        if user_input.strip() in['q','quit']:
            print('欢迎下次再来!')
            break
        # 判断如果是空白,提示信息后继续循环
        if user_input.strip()  == '':
            print('请输入有效内容!')
            continue
        # 预测译文
        result = predict(user_input, model, cn_tokenizer,en_tokenizer, device)
        print('英文:',result)


if __name__ == '__main__':
    # text = "我们公司"
    # top5_tokens = predict(text)
    # print(top5_tokens)
    run_predict()

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐