目录

一、案例描述

二、数据

三、实现代码步骤

1.导包

2.设置变量(运行设备、文件数据路径)

3.数据清洗

4.构建词汇表

5.自定义数据集

6.构建数据加载器

7.无注意机制的编码器

8.有注意机制的解码器

9.模型训练

9.1 单次训练主体代码

9.2 训练的方法

10.模型预测

11.main函数


一、案例描述

整体从 流程图例

下图具体说明:可以参考上一篇
https://blog.csdn.net/i_k_o_x_s/article/details/160842333?spm=1001.2014.3001.5502


模型流程说明:参考早些写的一篇
https://blog.csdn.net/i_k_o_x_s/article/details/160285644?spm=1001.2014.3001.5501

从内到外:

注意力计算 -> 计算出专属信息包 -> (编码器+解码器+专属信息包)神经网络 -> 模型训练流程

模型架构:Seq2Seq 带注意力机制(Attention)

二、数据

数据格式

三、实现代码步骤

1.导包

from torch.utils.data import Dataset,DataLoader
import torch.nn as nn
import torch
import re
from tqdm import tqdm
import random
import matplotlib.pyplot as plt

2.设置变量(运行设备、文件数据路径)

# 2 - 定义变量
device = torch.device('cuda' if torch.cuda.is_available() else
                      'mps' if torch.backends.mps.is_available() else
                      'cpu')

data_path = "eng-fra-v2.txt"    # 数据文件路径
SOS_TOKEN = 0                   # 句子开始标识
EOS_TOKEN = 1                   # 句子结束标识
MAX_LENGTH = 10                 # 翻译结果中法语句子长度的上限,也就是法语词的个数

3.数据清洗

# 3 - 数据清洗
def normalizeString(line):
    # 全部转小写,去空格
    line = line.lower().strip()
    # 在指定的标点符号前面增加空格:正则表达式的分组序号从1开始数。
    line = re.sub(r"([.!?])", r" \1", line)
    # 去除除了26个小写和.!? 以外的符号,全部替换成空格
    line = re.sub(r"[^a-zA-Z.!?]+", r" ", line)
    return line

4.构建词汇表

# 4 - 构建词汇表
def getData():
    # 4.1 读取文件
    with open(data_path,encoding="utf-8",mode='r') as f:
        eng_fra_list = f.readlines()

        # 4.2 划分英语 和 法语
        # 【【英语句子1,法语句子1】,【英语句子2】,【法语句子2】.....】
        eng_fra_pairs = []
        for eng_fra_line in eng_fra_list:
            tmp_list = []
            for line in eng_fra_line.split("\t"):
                tmp_list.append(line)

            eng_fra_pairs.append(tmp_list)

    # 4.3 分别得到英语词汇表和法语词汇表
    #     词汇表中的词的个数 默认先给2个
    english_word2index = {"SOS":SOS_TOKEN,"EOS":EOS_TOKEN}
    english_word_cnt = 2 

    fra_word2index = {"SOS":SOS_TOKEN,"EOS":EOS_TOKEN}
    fra_word_cnt = 2

    for eng_fra_list in eng_fra_pairs:
        # 英语句子处理: 分词 -> 去重 -> 添加到词汇表
        for eng_word in eng_fra_list[0].sqlit(" "):
            if eng_word not in english_word2index:
                english_word2index[eng_word] = english_word_cnt
                english_word_cnt += 1


    for eng_fra_list in eng_fra_pairs:
        # 法语句子处理: 分词 -> 去重 -> 添加到词汇表
        for fra_word in eng_fra_list[1].sqlit(" "):
            if fra_word not in fra_word2index:
                fra_word2index[fra_word] = fra_word_cnt
                fra_word_cnt += 1

    # 4.4 词汇表 key 和 value 调换
    fra_index2word = {index:word for word,index in fra_word2index.items()}

    # 4.5 返回值
    return (
        english_word2index,     #英语 dic:  key 是word,value是 index
        english_word_cnt,       #英语词的长度
        fra_word2index,         #法语 dic:  key 是word,value是 index
        fra_word_cnt,           #法语词的长度
        fra_index2word,         #dic:  key 是index,value是 word
        eng_fra_pairs           #英语/法语翻译表 数组 [[英语,法语],[英语, 法语],.....]
    )

5.自定义数据集

# 5 - 自定义数据集
"""
    实现自定义的DataSet的流程如下:
    1. 定义一个类,继承Dataset
    2. 三个魔法方法
        __init__  初始化属性值
        __len__   获得样本条数
        __getitem__ 根据具体的索引,获得对应的样本数据
"""
class MyPairDataset(Dataset):
    def __init__(self,eng_fra_pairs):
        self.eng_fra_pair = eng_fra_pairs       # 翻译表 数组 [[英语,法语],[英语, 法语],.....]
        self.sample_cnt = len(eng_fra_pairs)    # 样本条数

    def __len__(self):
        return self.sample_cnt

    def __getitem__(self, index):
        """
        根据 传入的index 从 eng_fra_pairs 中获取 特征x 和 标签y
        :param index: 索引值
        :return: 特征x 和 标签y
        """
        # 1- 防止索引为负数;防止索引越界
        index = min(max(index,0), self.sample_cnt - 1)

        # 2- 获得英语句子(特征数据)和法语句子(目标值)
        # 2.1- 获得句子。[英语句子1, 法语句子1]
        eng_fra_pair = self.eng_fra_pairs[index]
        
        
        # 句子 -> 分词 -> 词 -> 词索引  
        # [英句index,法句index] -> 英句子 -> 英词数组 -> 英词对应的索引数组
        x = [english_word2index[word] for word in eng_fra_pair[0].split(" ")]
        # 普通特征 x -> 张量
        x_tensor = torch.tensor(x, dtype=torch.long, device=device)

        """
                # 为什么这里只添加了 EOS_TOKEN,没有添加SOS_TOKEN?
                # 实际都需要有,SOS_TOKEN会在模型训练的时候再添加进去
        """
        y = [english_word2index[word] for word in eng_fra_pair[1].split(" ")]
        y.append(EOS_TOKEN) #告诉解码器翻译工作结束
        y_tensor = torch.tensor(y, dtype=torch.long, device=device)

        return x_tensor,y_tensor

6.构建数据加载器

# 6 - 构建数据加载器
def get_dataloader():
    # 1 - 创建自定义数据集 实例对象
    dataset = MyPairDataset(eng_fra_pairs)

    #2 shuffle 打散数据
    # 注意: 这里没有做句子长度规范,因此送入到编码器、解码器中句子的长度不等长,所以batch_siez只能是1
    # 否则会报错:RuntimeError: stack expects each tensor to be equal size, but got [5] at entry 0 and [4] at entry 1
    dataloader = DataLoader(dataset=dataset,batch_size=1,shuffle=True)
    # dataloader = DataLoader(dataset=dataset,batch_size=2,shuffle=True)
    return dataloader

7.无注意机制的编码器

根据词汇表统计的英文词个数  => 带入词嵌入层 => 生成词向量 => 通过GRU网络处理 => 输出

# 7 - 无注意力机制 编码器
class Encoder(nn.Module):
    def __init__(self,eng_vocab_size,input_size,hidden_size):
        # 1. 初始化父类
        super().__init__()

        # 2. 设置属性值
        self.eng_vocab_size = eng_vocab_size    #词汇表中词的个数
        self.input_size = input_size            #词向量维度
        self.hidden_size = hidden_size          #隐藏状态向量维度
        self.num_layers = 1                     #隐藏层层数

        # 3. 搭建网络结构
        # 3.1 词嵌入层
        self.ebd = nn.Embedding(
            num_embeddings=self.eng_vocab_size,
            embedding_dim=self.input_size
        )

        # 3.2 循环网络
        self.gru = nn.GRU(
            embedding_dim=self.input_size,
            hidden_size=hidden_size,
            num_layers=1,
            batch_first=True
        )

    def forward(self,input,hidden):
        """
        Encoder端的前向传播:负责对英语句子进行语义理解
        :param input: 本次输入数据,张量形状:【batch_size,seq_len】
        :param hidden: 上一个时间步的隐藏状态 张量形状:【num_layer,batch_size,hidden_size】
        :return:
        """
        # 1. 调用词嵌入层:将词索引变成词向量
        # embed 张量形状:【batch_size,seq_len,input_size】
        embed = self.edbd(input)

        # 2. 调用GRU
        output,hidden = self.gru(embed,hidden)

        return output,hidden


    def init_hidden(self):
        """
        对初始隐藏状态初始化 一般全0初始化
        张量形状[num_layers,batch_size,hidden_size]
        :return: 初始隐藏状态
        """
        # 为什么是1,1  因为隐藏层层数为1,句子没有设定上限,前面数据加载器batch_size=1
        return torch.zeros(size=(1,1,self.hidden_size)).to(device)

8.有注意机制的解码器

① 有注意力机制 就要想到 Q、K、V ,专属信息包C的加权求和计算

② 6个步骤,具体上一篇有写到

https://blog.csdn.net/i_k_o_x_s/article/details/160842333?spm=1001.2014.3001.5502

③ 注意:

和之前Q、K、V 取值案例不同

这里相当于Q和K变换了下。

为什么?

                     K来自【解码器上一个时间步隐藏状态】、

                     Q来自【解码器上一个时间步预测结果】、

                     V来自【编码器的输出状态】

与之前的又不一样?
    <1>  这是注意力的另外一种实现方式。该方式可以使用在文本翻译业务场景中
    <2>  Q、K的这种来源与前面【注意力计算规则】中不一样。但是这种形式可以计算得到注意力。这种机制的核心思想是:在生成当前词时,模型不仅要看“我上一步想到什么”(解码器状态),还要看“我现在要处理什么输入”(当前输入词),然后综合这两者去决定“我该关注编码器的哪个部分”。
   <3> 这种设计在Seq2Seq机器翻译模型中非常经典,能同时考虑解码器状态和当前输入词的语义。

代码:
 

# 8 - 有注意力机制 解码器
class AttenDecoder(nn.Module):
    def __init__(self,fra_vocab_size,hidden_size):
        # 1. 初始化匪类
        super().__init__()

        # 2. 设置属性值
        self.fra_vocab_size = fra_vocab_size        #法语词汇编大小
        """
            同时用来表示 词向量维度、隐藏状态向量维度 256
            self.input_size = hidden_size
        """
        self.hidden_size = hidden_size

        # 3. 搭建网络层
        # 3.1 词嵌入层
        self.embedding = nn.Embedding(
            num_embeddings=fra_vocab_size,
            embedding_dim=hidden_size
        )

        # 3.2 Dropout 随机失活层
        self.dropout = nn.Dropout(p=0.2)

        # 3.3 计算Q和K相似性的线性层
        self.attn = nn.Linear(
            in_features=self.hidden_size+self.hidden_size,  # Q和K拼接 256+256
            out_features=MAX_LENGTH                         # 相似性的个数 MAX_LENGTH 10
        )

        # 3.4 调整Q和C拼接后的张量形状的线性层
        self.attn_combine = nn.Linear(
            in_features=self.hidden_size+self.hidden_size,
            out_features=hidden_size
        )

        # 3.5 GRU循环网络层
        self.rgu = nn.GRU(
            embedding_dim=self.hidden_size,
            hidden_size= hidden_size,
            num_layers=1,
            batch_first=True
        )

        # 3.6 输出层
        """
            fra_vocab_size 词汇表大小
        """
        self.out = nn.Linear(in_features=self.hidden_size,out_features=self.fra_vocab_size)

        # 4 激活函数
        self.log_softmax = nn.LogSoftmax(dim=-1)

    def forward(self,input,prev_hidden, encoder_outputs):
        """
        前向传播
        :param input:  就是 Q 解码器本次输入的法语词,张量形状【batch_size,seq_len】  【【一个法语词的词索引】】
        :param prev_hidden: 解码器上一个时间步的隐藏状态,也就是K 张量状态【num_layers, batch_size,hidden_size】
        :param encoder_outputs: V 编码器,所有时间步隐藏状态拼接后的经过线性处理后的结果,【batch_size,MAX_LENGTH,hidden_size】
        :return:
        """

        # 1 - 调用词嵌入层
        Q = self.dropout(self.embedding(input))

        # 2 - 计算注意力机制
        """
            Q:embedding
            K:prev_hidden
            V:encoder_outputs
        """
        # 2.1 Q和K拼接
        # ① 形状调整形状 【num_layers, batch_size,hidden_size】 -> 【batch_size, num_layers,hidden_size】
        K = prev_hidden.transpose(dim0=0,dim1=1)

        qk_cat = torch.cat((Q,K),dim=-1)

        # 2.2 Q和K通过线性层计算相似性
        score = self.attn(qk_cat)

        # 2.3 将相似性转成权重矩阵
        attention_weights = torch.softmax(score,dim=-1)

        # 2.4 获得专属信息包
        C = torch.bmm(encoder_outputs,attention_weights)

        # 2.5 Q和C 拼接
        qc_cat = torch.cat((Q,C),dim=-1)

        # 2.6 调整张量状态
        qc_cat_liner = self.attn_combine(qc_cat)
        qc_cat_output = torch.relu(qc_cat_liner)

        # 3 - 调用GRU
        output,hidden = self.gru(
            qc_cat_output,          #输出值
            prev_hidden             #h0 上个时间步的隐藏状态
        )

        # 4 - 调用输出层
        # 线性相似度,权重
        final_output = self.log_softmax(self.out(output))

        return (
            final_output,       #输出
            hidden,             #hn最后隐藏状态
            attention_weights   #权重
        )


    def init_hidden(self):
        # 隐藏状态张量形状:[num_layers,batch_size,hidden_size]
        # num_layers:目前手动指定为1层
        # batch_size:因为我们没有对英语句子进行长度规范的处理,因此只能设置为1
        return torch.zeros(size=(1,1,self.hidden_size)).to(device=device)

9.模型训练

数据加载 ->       模型训练 -> 梯度优化器 -> 损失函数 -> 计算损失值 -> 梯度清零 -> 反向传播 ->

更新参数(权重+偏置)

9.1 单次训练主体代码
# 9 - 模型训练
# 9.1 - 单挑样本数据的训练
def train_iter(
        x_train,        # x_train:完整的一条英语句子,张量形状[batch_size, seq_len]
        y_train,        # y_train:完整的一条法语句子,张量形状[batch_size, seq_len]
        my_encoder,     # my_encoder:编码器对象
        my_decoder,     # my_decoder:解码器对象
        encoder_adam,   # encoder_adam:编码器优化器
        decoder_adam,   # decoder_adam:解码器优化器
        loss            # 损失函数对象
):
    """
    单条样本的训练过程,返回一个损失值
    """

    # 1 - 调用编码器
    encoder_hidden = my_encoder.init_hidden()
    encoder_outputs, encoder_hidden = my_encoder(x_train,encoder_hidden)

    # 2 - 解码器

    # 2.1 初始化 V  [batch_size, seq_len, 256]
    V = torch.zeros(size=(1,MAX_LENGTH,256),device=device)

    # 2.2 获取本次训练 词的个数  [batch_size,seq_len]
    eng_seq_len = x_train.shape[1]
    fra_seq_len = y_train.shape[1]

    # 2.3 比较 eng_seq_len 和 设定的 MAX_LENGTH 取小的
    seq_len_min = min(eng_seq_len,MAX_LENGTH)

    # 2.4 V的值 来源于 编码器的输出值  encoder_output
    V[:, seq_len_min, :] = encoder_outputs[:, seq_len_min, :]

    # 2.5 K的值 => 来源于 编码器的最后一个时间步的隐藏状态
    K = encoder_hidden

    # 2.6 Q的值
    # 存储法语翻译结果:而且第一个词表示的是翻译工作的开始,也就是要存放SOS_TOKEN
    Q = torch.tensor([[SOS_TOKEN]], device=device)

    # 2.7 损失值
    loss_value = 0.0

    # ========== 教师机制 ==========#
    """
        教师机制,称之为teacher_forcing
        1- 只能使用在模型训练阶段,不能使用在模型预测阶段
        2- 即使是使用在模型训练阶段,也不要每次都告诉它真实值是啥。为了提高模型的泛化能力
    """
    teacher_forcing_flag = True if random.random()<0.5 else False
    # ============================= #

    # 3 - 编码器  评估/预测
    if teacher_forcing_flag:
        # 使用教师机制
        for i in range(fra_seq_len):
            # 3.1 前向传播
            decoder_output, decoder_hidden, attn_weights = my_decoder(
                input=Q,
                prev_hidden=K,
                encoder_outputs=V
            )

            # 3.2 计算损失值
            # 预测值
            # 改变形状 三维 -> 二维 [1,1,4345] -> [1, 4345]  这里fra_word_cnt = 4345 法语词个数
            decoder_output = decoder_output.reshape(1,-1)

            # 真实值
            # y_train 形状 [batch_size, seq_len],并且batch_size = 1
            # y_train[0]:取第一条句子;
            # y_train[0][i]:取第一条句子中的第i个词
            # 单个词: 标量 -> 1维 张量 【1】
            y_true = y_train[0][i].reshape(1)

            loss_value += loss(decoder_output, y_true)

            # 3.3 告诉下一个时间步【真实】的法语词内容 【1,1】
            Q = y_train[0][i].reshape(1,-1)

    else:
        for i in range(fra_seq_len):
            # 3.1 前向传播
            decoder_output, decoder_hidden, attn_weights = my_decoder(
                input=Q,
                prev_hidden=K,
                encoder_outputs=V
            )

            # 3.2- 计算损失值
            # 预测值
            pred_output = decoder_output.reshape(1, -1)
            # 真实值
            y_true = y_train[0][i].reshape(1)
            loss_value += loss(pred_output, y_true)

            # 3.3
            # 4345 里面找到概率值 最大的索引 [法语词索引]
            pred_word_index = torch.argmax(pred_output, dim=-1)

            # 3.4 - 如果遇到了EOS_TOKEN的结束标识,那么就结束翻译工作
            # 把张量里的值 取出来
            if pred_word_index.item()==EOS_TOKEN:
                break

            # 3.5 告诉下个时间步【预测】的法语词内容
            # 生成 [batch_size, seq_len]
            Q = pred_word_index.reshape(1,-1)

    # 4 - 梯度清零
    encoder_adam.zero_grad()
    decoder_adam.zero_grad()

    # 5 - 反向传播
    loss_value.sum().backward()

    # 6 - 更新参数
    encoder_adam.step()
    decoder_adam.step()

    # 7 - 返回损失值
    return loss_value.item()/fra_seq_len
9.2 训练的方法
# 9.2 训练的方法
def train():
    # 1 - 加载数据
    dataloader = get_dataloader()

    # 2 - 创建模型实例对象
    my_encoder = Encoder(eng_vocab_size=english_word_cnt,input_size=256,hidden_size=256).to(device)
    my_decoder = AttenDecoder(fra_vocab_size=fra_word_cnt,hidden_size=256).to(device)

    # 3 - 创建优化器实例对象
    encoder_adam = torch.optim.Adam(my_encoder.parameters(),lr=0.001)
    decoder_adam = torch.optim.Adam(my_decoder.parameters(),lr=0.001)

    # 4 - 创建损失函数实例对象
    """
        因为 Encoder 和 AttenDecoder 输出用了 log_softmax
        所以这里用NLLLoss   (Logsoftmax + NLLLoss 组合使用)
        
        否则这里 用 nn.CrossEntropyLoss(), 编码器和解码器端 就不用写 softmax
    """
    loss = nn.NLLLoss(reduction='sum')

    # 5 - 设置模式,允许神经元随机失活 Dropout
    my_encoder.train()
    my_decoder.train()

    # 6 - 训练
    epochs = 1

    avg_loss_list = []  # 用来记录平均损失值

    """
    enumerate:给循环加序号
    tqdm:给循环加进度条
    合起来:循环数据 + 显示进度 + 记录批次
    """
    for epoch in range(epochs):
        total_loss_value = 0.0  # 总损失值

        for i,(x_train, y_train) in enumerate(tqdm(dataloader),start=1):
            # 6.1 - 单条数据训练,得到损失值
            loss_value = train_iter(x_train, y_train, my_encoder, my_decoder, encoder_adam, decoder_adam, loss)

            # 6.2 - 更新损失值
            total_loss_value += loss_value

            # 6.3 - 每间隔100个批次记录一次平均损失信息
            if i % 100 == 0:
                # 计算平均损失值
                avg_loss_value = total_loss_value/100
                avg_loss_list.append(avg_loss_value)
                print(f"第{epoch+1}轮次,平均损失值{avg_loss_value}")

    # 7 - 保存训练好的模型
    torch.save(my_encoder.state_dict(),'my_encoder.pkl')
    torch.save(my_decoder.state_dict(),'my_decoder.pkl')

    # 8- 绘制损失变化曲线
    plt.plot(avg_loss_list)
    plt.show()

10.模型预测

def use_seq2seq_evaluate():
    # 1- 手动准备未知数据
    my_samplepairs = [
        ['i m impressed with your french .', 'je suis impressionne par votre francais .'],
        ['i m more than a friend .', 'je suis plus qu une amie .'],
        ['she is beautiful like her mother .', 'elle est belle comme sa mere .']
    ]

    # 2- 加载训练好的模型
    my_encoder = Encoder(eng_vocab_size=english_word_cnt,input_size=256,hidden_size=256).to(device=device)
    my_encoder.load_state_dict(torch.load("my_encoder.pkl"))

    my_decoder = AttenDecoder(fra_vocab_size=franch_word_cnt,hidden_size=256).to(device=device)
    my_decoder.load_state_dict(torch.load("my_decoder.pkl"))

    # 3- 设置模式
    my_encoder.eval()
    my_decoder.eval()

    # 4- 预测
    for i,pair in enumerate(my_samplepairs):
        x = pair[0] # 英语句子
        y = pair[1] # 英语句子

        # 4.1- 英语句子转成张量
        x_list = [english_word2index[word] for word in x.split(" ")]
        # 所有的张量数据必须放在同一个硬件设备上,要么同时在GPU,要么同时在CPU
        x_tensor = torch.tensor(x_list, dtype=torch.long, device=device).reshape(1,-1)

        # 4.2- 对单条句子进行评估
        franch_word_list = seq2seq_evaluate(x_tensor,my_encoder,my_decoder)

        # 4.3- 输出结果
        print(f"英语句子 {x},真实法语句子 {y},翻译法语句子 {' '.join(franch_word_list)}")

11.main函数

if __name__ == '__main__':
    # 1- 数据清洗测试
    # content = " I LOve h@@@eima! "
    # normalizeString(content)

    # 2- 读取文件
    # getdata()

    # print(english_word_cnt)
    # print(franch_word_cnt)
    # print(eng_fra_pairs[:5])
    # print(english_word2index)
    # print(franch_word2index)

    # 3- 测试Dataloader
    # dataloader = get_dataloader()
    # for x,y in dataloader:
    #     print(x)
    #     print(y)
    #
    #     break

 
    # 4- 模型训练
    train()

    # 5- 模型预测
    use_seq2seq_evaluate()

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐