前言:我们今天从两个问题开始,学习Bert模型!

Bert模型的架构以及每一部分的作用?

宏观上BERT分三个主要模块.

  • Embedding模块
    • Token Embedding  +  Segment Embedding  + Position Embedding 
  • Transformer模块
  • 预微调模块

Bert模型两大预训练任务,并谈一谈你的理解?

  • 任务一: Mask LM 带mask的语言模型训练
  • 任务二: Next Sentence Prediction 下一句话预测任务

BERT模型组成

Embedding层

  • Token Embeddings 是词嵌入张量, CLS开始-SEP结束, 可以用于之后的分类任务.
  • Segment Embeddings 是句子分段嵌入张量, 是为了服务后续的两个句子为输入的预训练任务.
  • Position Embeddings 是位置编码张量, 此处注意和传统的Transformer不同, 不是三角函数计算的固定位置编码, 而是通过学习得出来的,可不断学习更新的Embedding层.
  • 整个Embedding模块的输出张量就是这3个张量的直接加和结果.
  • Token Embeddings + Segment Embeddings + Position Embeddings

双向Transformer模块

  • BERT中只使用了经典Transformer架构中的Encoder部分, 完全舍弃了Decoder部分. 而两大预训练任务也集中体现在训练Transformer模块中.
  • 双向的解释:
    • encoder部分没有causal mask(因果掩码),padding mask影响不大,multi head attention 可两两做计算,对比GPT中decoder 的 causal mask ,GPT属于单向模块
    • 采用MASK任务训练模型,可根据上下文预测MASK
  • BERT 正是通过双向 Transformer Encoder + MLM 预训练任务,实现了对上下文的深度理解。

预微调模块

四种典型任务的微调方式:

(a) 句子对分类任务 Sentence Pair Classification Tasks

"""
# 输入:[CLS] Sentence1 [SEP] Sentence2 [SEP]
# 输出:使用 [CLS] token 的表示进行分类
# 任务:MNLI(推理)、QQP(问题对等)
"""

迁移学习中文句子关系demo

# -*-coding:utf-8-*-
# 导入工具包
import torch
import random
import torch.nn as nn
from datasets import load_dataset
from transformers import BertTokenizer, BertModel
# from transformers import AdamW
from torch.optim import AdamW
# import torch.optim as optim
# optim.AdamW
from torch.utils.data import DataLoader, Dataset
import time
from tqdm import tqdm

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cpu'
print(device)
# 加载分词器
bert_tokenizer = BertTokenizer.from_pretrained("/model/bert-base-chinese")
# print(bert_tokenizer.get_vocab())
# print(bert_tokenizer.mask_token)
# print(bert_tokenizer.mask_token_id)
print(bert_tokenizer.vocab_size)
# 加载model
bert_model = BertModel.from_pretrained("/model/bert-base-chinese")
# print(my_model)
# 如果用gpu,需要把预训练模型也要放到gpu上
bert_model = bert_model.to(device)



# 自定义Dataset对象
class NspDataset(Dataset):
    def __init__(self, data_path):
        super().__init__()
        # 加载数据
        dataset = load_dataset('csv', data_files=data_path, split='train')
        # 只获取样本长度大于44的样本
        self.dataset = dataset.filter(lambda x: len(x["text"]) > 44)

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, item):
        # 因为是要做NSP任务:给你两句话,判断第二句话是否是第一句话的真实的下一句,因此我们要构造样本对
        #(Seq1, Seq2)--》标签可以为0那就是没有关系,标签可以为1那就是有关系
        label = 1
        sequence = self.dataset[item]["text"]
        # print(f'sequence--》{sequence}')
        seq1 = sequence[:22]
        seq2 = sequence[22:44]
        # 还要有一半的概率是负样本
        if random.randint(0, 1) == 0:
            # 重新选择一个索引
            j = random.randint(0, len(self.dataset)-1)
            seq2 = self.dataset[j]["text"][22:44]
            label = 0

        return seq1, seq2, label


def collate_fn(data):
    # data--》[(seq1, seq2, label),...]
    # print(data)
    # 取出每个样本的句子对
    sequences = [value[:2] for value in data]
    # 取出每个样本的标签
    labels = [value[-1] for value in data]
    # print(f'sequences--》{sequences}')
    # print(f'labels--》{labels}')
    # 对上述原始的句子对进行编码
    inputs = bert_tokenizer(sequences, padding='max_length', truncation=True,
                                              max_length=50, return_tensors='pt')

    # print(f'inputs--》{inputs}')

    input_ids = inputs["input_ids"]
    token_type_ids = inputs["token_type_ids"]
    attention_mask = inputs["attention_mask"]
    labels_y = torch.tensor(labels, dtype=torch.long)

    return input_ids, token_type_ids, attention_mask, labels_y
def test_dataloader():
    # 实例化dataset对象
    train_dataset = NspDataset('./data/train.csv')
    #  实例化dataloader对象
    train_dataloader = DataLoader(train_dataset, batch_size=8, collate_fn=collate_fn, shuffle=True, drop_last=True)
    return train_dataloader

# todo:3.定义模型

class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        # 因为是微调,所以我们承接的结果是bert预训练模型的输出结果,为768特征,所以输出层输入特征为768,输出特征为2,(二分类)
        self.out = nn.Linear(768, 2)

    def forward(self, input_ids, token_type_ids, attention_mask):
        # 将上述三个输入参数送入bert预训练模型,但是注意,预训练模型在这里参数不进行更新,因此需要:with torch.no_grad():
        with torch.no_grad():
            bert_output = bert_model(input_ids=input_ids,
                                     token_type_ids=token_type_ids,
                                     attention_mask=attention_mask)
        # last_hidden_state-->[4, 200, 768]
        # pooler_output-=->[4, 768]
        # print(f'bert_output1--》{bert_output["last_hidden_state"].shape}')
        # print(f'bert_output2--》{bert_output["pooler_output"].shape}')
        # bert_output["pooler_output"]代表每个样本的CLS--token对应的隐藏层输出结果,代表整个句子的语意
        # 将bert编码之后的结果pooler_output送入输出层
        result = self.out(bert_output["pooler_output"])
        return result

# todo:4.训练模型

def model2train():
    # 第一步读文件获取数据:
    train_dataset = NspDataset(data_path='./data/train.csv')
    # print(f'train_dataset---》{train_dataset}')
    # 第二步:将上述的dataset进行再次封装
    train_dataloader = DataLoader(dataset=train_dataset, batch_size=8,
                                  collate_fn=collate_fn, shuffle=True,
                                  drop_last=True)
    # 第三步:实例化模型
    my_model = MyModel().to(device)
    # 第四步:实例化优化器对象
    my_adamw = AdamW(my_model.parameters(), lr=5e-4)
    # 第五步:实例化损失函数对象
    my_cross = nn.CrossEntropyLoss()
    # 第六步:强调预训练模型的参数不参与更新
    for param in bert_model.parameters():
        param.requires_grad_(False)

    # 定义训练的轮次
    my_model.train()
    epochs = 1
    # 开始训练
    for epoch in range(epochs):
        # 开始时间
        start_time = int(time.time())
        # 开始内部数据的迭代
        for idx, (input_ids, token_type_ids, attention_mask, labels_y) in enumerate(tqdm(train_dataloader),start=1):
            # 将数据送入模型得到预测的结果
            input_ids = input_ids.to(device)
            token_type_ids = token_type_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels_y = labels_y.to(device)
            output = my_model(input_ids, token_type_ids, attention_mask)
            # 计算损失
            my_loss = my_cross(output, labels_y)
            # print(f'my_loss-》{my_loss}')
            # 梯度清零
            my_adamw.zero_grad()
            # 反向传播
            my_loss.backward()
            # 梯度更新
            my_adamw.step()

            # 每隔5步打印训练日志
            if idx % 20 == 0:
                # 取出一个批次样本中模型预测的结果
                predicts = torch.argmax(output, dim=-1)
                # 计算平均准确率
                avg_acc = (predicts == labels_y).sum().item() / len(labels_y)
                print('轮次:%d 迭代数:%d 损失:%.6f 准确率%.3f 时间%d' \
                      %(epoch, idx, my_loss.item(), avg_acc, (int)(time.time())-start_time))

        # 每轮都保存模型
        torch.save(my_model.state_dict(), './save_model/ai_nsp_%d.bin' % (epoch+1))

# todo:5.模型测试
def model2test():
    # 第一步读文件获取数据测试集:
    test_dataset = NspDataset(data_path='./data/test.csv')
    # print(f'test_dataset---》{test_dataset}')
    # 第二步:将上述的dataset进行再次封装
    test_dataloader = DataLoader(dataset=test_dataset, batch_size=8,
                                  collate_fn=collate_fn, shuffle=True,
                                  drop_last=True)
    # 第三步:加载训练好的模型
    my_model = MyModel().to(device)
    my_model.load_state_dict(torch.load('./save_model/ai_nsp_1.bin'))

    # 第四步:定义测试的超参数
    total = 0 # 计算已经迭代样本
    acc_num = 0 # 计算已经预测正确的样本的个数
    # 注意,把模型设置为eval()
    my_model.eval()
    # 第五步:开始测试
    for idx, (input_ids, token_type_ids, attention_mask, labels_y) in enumerate(tqdm(test_dataloader), start=1):
        # print(f'input_ids--》{input_ids}')
        input_ids = input_ids.to(device)
        token_type_ids = token_type_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels_y = labels_y.to(device)
        with torch.no_grad():
            output = my_model(input_ids, token_type_ids, attention_mask)

        # 计算预测正确的样本个数
        predicts = torch.argmax(output, dim=-1)
        acc_num = acc_num + (predicts == labels_y).sum().item()
        total = total + len(labels_y)
        # 每个5步,打印一下平均准确率,并且取出一个批次的第一个样本,进行结果的展示
        if idx % 5 == 0:
            print(f'平均准确率:{acc_num/total}', end='  ')
            print(f'取出样本:{bert_tokenizer.decode(input_ids[0], skip_special_tokens=True)}', end='  ')
            print(f'预测值:{predicts[0]}, 真实值:{labels_y[0]}')
            print('*'*80)

if __name__ == '__main__':
    # model2train()
    model2test()

(b) 单句子分类任务 Single Sentence Classification Tasks

"""
# 输入:[CLS] Sentence [SEP]
# 输出:使用 [CLS] token 的表示进行分类
# 任务:SST-2(情感分类)、CoLA(语法判断)
"""

 迁移学习中文分类demo

样本图片见上图酒店评论

# -*-coding:utf-8-*-
# 导入工具包
import torch
import torch.nn as nn
from datasets import load_dataset
from transformers import BertTokenizer, BertModel
# from transformers import AdamW
from torch.optim import AdamW
from torch.utils.data import DataLoader
import time
from tqdm import tqdm

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'mps'
print(device)
# 加载分词器
bert_tokenizer = BertTokenizer.from_pretrained(r"/model/bert-base-chinese")
# 加载model
bert_model = BertModel.from_pretrained(r"/model/bert-base-chinese")
# print(my_model)
# 如果用gpu,需要把预训练模型也要放到gpu上
bert_model = bert_model.to(device)
print(bert_model)

#todo:1.读取数据

def read_data():
    # 1.读取训练数据集
    train_dataset = load_dataset('csv', data_files='./data/train.csv', split="train")
    # print(f'train_dataset--》{train_dataset}')
    # print(f'train_dataset的样本长度--》{len(train_dataset)}')
    # print(f'train_dataset根据索引取出一个样本--》{train_dataset[0]}')
    # print(f'train_dataset根据索引切片取出多个样本--》{train_dataset[0:3]}')
    # 2.读取测试数据集
    test_dataset = load_dataset('csv', data_files='./data/test.csv', split='train')
    # print(f'test_dataset--》{test_dataset}')
    # print(f'test_dataset的样本长度--》{len(test_dataset)}')
    # print(f'test_dataset根据索引取出一个样本--》{test_dataset[0]}')
    # print(f'test_dataset根据索引切片取出多个样本--》{test_dataset[0:3]}')

    # 3.读取验证数据集
    valid_dataset = load_dataset('csv', data_files='./data/validation.csv', split='train')
    # print(f'valid_dataset--》{valid_dataset}')
    # print(f'valid_dataset的样本长度--》{len(valid_dataset)}')
    # print(f'valid_dataset根据索引取出一个样本--》{valid_dataset[0]}')
    # print(f'valid_dataset根据索引切片取出多个样本--》{valid_dataset[0:3]}')

    return train_dataset, test_dataset, valid_dataset


def collate_fn(data):
    '''
    自定义函数,目的是对dataset中的数据进行处理
    :param data:
    :return:
    '''
    # print(f'自定义函数的参数data数据展示--》{len(data)}')
    # 获取一个批次样本中的所有的句子
    sentences = [value["text"] for value in data]
    # print(f'sentences数据展示--》{sentences}')
    # print(f'sentences[0]数据展示--》{len(sentences[0])}')
    # print(f'sentences[1]数据展示--》{len(sentences[1])}')
    # print(f'sentences[2]数据展示--》{len(sentences[2])}')
    # print(f'sentences[3]数据展示--》{len(sentences[3])}')
    # 获取一个批次样本中的所有的标签
    labels = [value["label"] for value in data]
    # print(f'labels数据展示--》{labels}')
    # 对一个批次的原始句子进行张量的转换,一定要对齐长度
    inputs = bert_tokenizer(sentences,
                                            padding='max_length',
                                            truncation=True,
                                            max_length=200,
                                            return_tensors='pt',
                                            )
    # print(f'inputs-->{inputs}')
    input_ids = inputs["input_ids"]
    token_type_ids = inputs["token_type_ids"]
    attention_mask = inputs["attention_mask"]
    labels_y = torch.tensor(labels, dtype=torch.long)

    return input_ids, token_type_ids, attention_mask, labels_y

# todo:2.获取dataloader

def get_dataloader():
    # 获取dataset
    train_dataset, _, _ = read_data()

    # 对上述的train_dataset进行dataloader的封装
    train_dataloader = DataLoader(dataset=train_dataset,
                                  batch_size=4,
                                  collate_fn=collate_fn,
                                  drop_last=True,
                                  shuffle=True)

    # 一定要迭代train_dataloader,才能查验collate_fn
    # for input_ids, token_type_ids, attention_mask, labels_y in train_dataloader:
    #     print(f'input_ids---》{input_ids.shape}')
    #     print(f'token_type_ids---》{token_type_ids.shape}')
    #     print(f'attention_mask---》{attention_mask.shape}')
    #     print(f'labels_y---》{labels_y.shape}')
    #     print('这是测试')
    #     break
    return train_dataloader

# todo:3.定义模型

class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        # 因为是微调,所以我们承接的结果是bert预训练模型的输出结果,为768特征,所以输出层输入特征为768,输出特征为2,(二分类)
        self.out = nn.Linear(768, 2)

    def forward(self, input_ids, token_type_ids, attention_mask):
        # 将上述三个输入参数送入bert预训练模型,但是注意,预训练模型在这里参数不进行更新,因此需要:with torch.no_grad():
        with torch.no_grad():
            bert_output = bert_model(input_ids=input_ids,
                                     token_type_ids=token_type_ids,
                                     attention_mask=attention_mask)
        # last_hidden_state-->[4, 200, 768]
        # pooler_output-=->[4, 768]
        # print(f'bert_output1--》{bert_output["last_hidden_state"].shape}')
        # print(f'bert_output2--》{bert_output["pooler_output"].shape}')
        # bert_output["pooler_output"]代表每个样本的CLS--token对应的隐藏层输出结果,代表整个句子的语意
        # 将bert编码之后的结果pooler_output送入输出层
        result = self.out(bert_output["pooler_output"])
        return result


# todo:4.训练模型

def model2train():
    # 第一步读文件获取数据:
    train_dataset = load_dataset('csv', data_files='./data/train.csv', split='train')
    # print(f'train_dataset---》{train_dataset}')
    # 第二步:将上述的dataset进行再次封装
    train_dataloader = DataLoader(dataset=train_dataset, batch_size=8,
                                  collate_fn=collate_fn, shuffle=True,
                                  drop_last=True)
    # 第三步:实例化模型
    my_model = MyModel().to(device)
    # 第四步:实例化优化器对象
    my_adamw = AdamW(my_model.parameters(), lr=5e-4)
    # 第五步:实例化损失函数对象
    my_cross = nn.CrossEntropyLoss()
    # 第六步:强调预训练模型的参数不参与更新
    for param in bert_model.parameters():
        param.requires_grad_(False)

    # 定义训练的轮次
    epochs = 1
    my_model.train()
    # 开始训练
    for epoch in range(epochs):
        # 开始时间
        start_time = int(time.time())
        # 开始内部数据的迭代
        for idx, (input_ids, token_type_ids, attention_mask, labels_y) in enumerate(tqdm(train_dataloader),start=1):
            # 将数据送入模型得到预测的结果
            input_ids = input_ids.to(device)
            token_type_ids = token_type_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels_y = labels_y.to(device)
            output = my_model(input_ids, token_type_ids, attention_mask)
            # 计算损失
            my_loss = my_cross(output, labels_y)
            # print(f'my_loss-》{my_loss}')
            # 梯度清零
            my_adamw.zero_grad()
            # 反向传播
            my_loss.backward()
            # 梯度更新
            my_adamw.step()

            # 每隔5步打印训练日志
            if idx % 5 == 0:
                # 取出一个批次样本中模型预测的结果
                predicts = torch.argmax(output, dim=-1)
                # 计算平均准确率
                avg_acc = (predicts == labels_y).sum().item() / len(labels_y)
                print('轮次:%d 迭代数:%d 损失:%.6f 准确率%.3f 时间%d' \
                      %(epoch, idx, my_loss.item(), avg_acc, (int)(time.time())-start_time))

        # 每轮都保存模型
        torch.save(my_model.state_dict(), './save_model/ai_classify_%d.bin' % (epoch+1))

# todo:5.模型测试
def model2test():
    # 第一步读文件获取数据测试集:
    test_dataset = load_dataset('csv', data_files='./data/test.csv', split='train')
    # print(f'test_dataset---》{test_dataset}')
    # 第二步:将上述的dataset进行再次封装
    test_dataloader = DataLoader(dataset=test_dataset, batch_size=8,
                                  collate_fn=collate_fn, shuffle=True,
                                  drop_last=True)
    # 第三步:加载训练好的模型
    my_model = MyModel().to(device)
    my_model.load_state_dict(torch.load('./save_model/ai_classify_1.bin'))

    # 第四步:定义测试的超参数
    total = 0 # 计算已经迭代样本
    acc_num = 0 # 计算已经预测正确的样本的个数
    # 注意,把模型设置为eval()
    my_model.eval()
    # 第五步:开始测试
    for idx, (input_ids, token_type_ids, attention_mask, labels_y) in enumerate(tqdm(test_dataloader), start=1):
        # print(f'input_ids--》{input_ids}')
        input_ids = input_ids.to(device)
        token_type_ids = token_type_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels_y = labels_y.to(device)
        with torch.no_grad():
            output = my_model(input_ids, token_type_ids, attention_mask)

        # 计算预测正确的样本个数
        predicts = torch.argmax(output, dim=-1)
        acc_num = acc_num + (predicts == labels_y).sum().item()
        total = total + len(labels_y)
        # 每个5步,打印一下平均准确率,并且取出一个批次的第一个样本,进行结果的展示
        if idx % 5 == 0:
            print(f'平均准确率:{acc_num/total}', end='  ')
            print(f'取出样本:{bert_tokenizer.decode(input_ids[0], skip_special_tokens=True)}', end='  ')
            print(f'预测值:{predicts[0]}, 真实值:{labels_y[0]}')
            print('*'*80)



if __name__ == '__main__':
    # model2train()
    # model2test()
    ...

(c) 问答任务(QA)Question Answering Tasks

"""
# 输入:[CLS] Question [SEP] Paragraph [SEP]
# 输出:预测答案在段落中的起始和结束位置
# 任务:SQuAD(阅读理解)
"""

(d) 序列标注任务(NER) Single Sentence Tagging Tasks

"""
# 输入:[CLS] Tok1 Tok2 Tok3 ... [SEP]
# 输出:为每个 token 预测标签(如 B-PER, O 等)
# 任务:命名实体识别
"""
"""
       预训练 BERT(通用语言理解)
                     ↓
    ┌────────────────┼────────────────┐
    ↓                ↓                ↓
句子对分类      单句分类          QA任务        NER任务
(加分类头)      (加分类头)      (加起止头)     (加序列头)
    ↓                ↓                ↓             ↓
  微调            微调             微调          微调
"""

迁移学习中文填空demo

样本图片见上图酒店评论

# -*-coding:utf-8-*-
# 导入工具包
import torch
import torch.nn as nn
from datasets import load_dataset
from transformers import BertTokenizer, BertModel
# from transformers import AdamW
from torch.optim import AdamW
from torch.utils.data import DataLoader
import time
from tqdm import tqdm

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'mps'
print(device)
# 加载分词器
bert_tokenizer = BertTokenizer.from_pretrained("/model/bert-base-chinese")
# print(bert_tokenizer.get_vocab())
# print(bert_tokenizer.mask_token)
# print(bert_tokenizer.mask_token_id)
print(bert_tokenizer.vocab_size)
# 加载model
bert_model = BertModel.from_pretrained("/model/bert-base-chinese")
# print(my_model)
# 如果用gpu,需要把预训练模型也要放到gpu上
bert_model = bert_model.to(device)


def collate_fn(data):
    # data是从dataset里面获取了一个批次的样本,格式为列表,列表中的每个元素是个字典,包含:{"label","text"}
    # print(data)
    sequences = [value["text"] for value in data]

    # 进行tokenizer编码
    inputs = bert_tokenizer(sequences, padding="max_length",
                                              truncation=True, max_length=32, return_tensors='pt')

    # print(f'inputs--》{inputs}')
    # input_ids-->shape-->[8, 32]
    input_ids = inputs["input_ids"]
    token_type_ids = inputs["token_type_ids"]
    attention_mask = inputs["attention_mask"]
    # print(f'input_ids[:,16]-->{input_ids[:, 16].shape}')
    # 取出第16个单词.clone()必须,相当于深copy
    labels = input_ids[:, 16].clone().long()
    # 把原始输入的每个样本的第16个索引位置的值替换为[MASK]--》这里对应的也是它的索引
    # input_ids[:, 16] = bert_tokenizer.get_vocab()[bert_tokenizer.mask_token]
    input_ids[:, 16] = bert_tokenizer.mask_token_id
    # print(f'input_ids--》{input_ids}')
    # print(f'labels--》{labels}')
    # labels = torch.tensor(labels, dtype=torch.long)
    return input_ids, token_type_ids, attention_mask, labels

# 定义测试函数:分析dataloader

def dm_test_dataloader():
    # 1.加载数据集
    train_dataset = load_dataset('csv', data_files='./data/train.csv', split='train')
    # print(f'train_dataset--》{train_dataset}')
    # print(f'train_dataset--》{train_dataset[1]}')

    # 2. 对数据进行处理,只获取text大于32的文本
    new_train_dataset = train_dataset.filter(lambda x: len(x["text"]) > 32)
    # print(f'new_train_dataset--》{new_train_dataset}')
    # print(f'new_train_dataset--》{len(new_train_dataset[1]["text"])}')
    # 3.将上述的数据进行dataloader的封装
    train_dataloader = DataLoader(dataset=new_train_dataset, batch_size=8,
                                  shuffle=True, collate_fn=collate_fn, drop_last=True)
    # # 4.遍历dataloader
    # for input_ids, token_type_ids, attention_mask, labels in train_dataloader:
    #     print('这是测试')
    #     print(f'input_ids--》{input_ids.shape}')
    #     print(f'token_type_ids--》{token_type_ids.shape}')
    #     print(f'attention_mask--》{attention_mask.shape}')
    #     print(f'labels--》{labels.shape}')
    #     break
    return train_dataloader

# 定义模型
class  MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        # bert_tokenizer.vocab_size = 21128
        self.out = nn.Linear(768, bert_tokenizer.vocab_size)

    def forward(self, input_ids, token_type_ids, attention_mask):
        with torch.no_grad():
            bert_output = bert_model(input_ids=input_ids,
                                     token_type_ids=token_type_ids,
                                     attention_mask=attention_mask)

        # print(f'bert_output--》{bert_output["last_hidden_state"].shape}')
        # bert_output["last_hidden_state"].shape->[8, 32, 768]
        # 只取出第16个位置对应的张量送入输出层得到预测的结果:output-->[8, 21128]
        output = self.out(bert_output["last_hidden_state"][:, 16])
        return output

# todo:4.训练模型

def model2train():
    # 第一步读文件获取数据:
    train_dataset = load_dataset('csv', data_files='./data/train.csv', split='train')
    # print(f'train_dataset---》{train_dataset}')
    # 需要获取样本长度大于32的样本
    new_train_dataset = train_dataset.filter(lambda x: len(x["text"])>32)
    # 第二步:将上述的dataset进行再次封装
    train_dataloader = DataLoader(dataset=new_train_dataset, batch_size=8,
                                  collate_fn=collate_fn, shuffle=True,
                                  drop_last=True)
    # 第三步:实例化模型
    my_model = MyModel().to(device)
    # 第四步:实例化优化器对象
    my_adamw = AdamW(my_model.parameters(), lr=5e-4)
    # 第五步:实例化损失函数对象
    my_cross = nn.CrossEntropyLoss()
    # 第六步:强调预训练模型的参数不参与更新
    for param in bert_model.parameters():
        param.requires_grad_(False)

    # 定义模型为训练模式
    my_model.train()
    # 定义训练的轮次
    epochs = 3
    # 开始训练
    for epoch in range(epochs):
        # 开始时间
        start_time = int(time.time())
        # 开始内部数据的迭代
        for idx, (input_ids, token_type_ids, attention_mask, labels_y) in enumerate(tqdm(train_dataloader),start=1):
            # 将数据送入模型得到预测的结果
            input_ids = input_ids.to(device)
            token_type_ids = token_type_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels_y = labels_y.to(device)
            output = my_model(input_ids, token_type_ids, attention_mask)
            # 计算损失
            my_loss = my_cross(output, labels_y)
            # print(f'my_loss-》{my_loss}')
            # 梯度清零
            my_adamw.zero_grad()
            # 反向传播
            my_loss.backward()
            # 梯度更新
            my_adamw.step()

            # 每隔5步打印训练日志
            if idx % 20 == 0:
                # 取出一个批次样本中模型预测的结果
                predicts = torch.argmax(output, dim=-1)
                # 计算平均准确率
                avg_acc = (predicts == labels_y).sum().item() / len(labels_y)
                print('轮次:%d 迭代数:%d 损失:%.6f 准确率%.3f 时间%d' \
                      %(epoch, idx, my_loss.item(), avg_acc, (int)(time.time())-start_time))

        # 每轮都保存模型
        torch.save(my_model.state_dict(), './save_model/ai_fill_mask_%d.bin' % (epoch+1))


# todo:5.模型测试
def model2test():
    # 第一步读文件获取数据测试集:
    test_dataset = load_dataset('csv', data_files='./data/test.csv', split='train')
    # print(f'test_dataset---》{test_dataset}')
    new_test_dataset  = test_dataset.filter(lambda x: len(x["text"])>32)
    # 第二步:将上述的dataset进行再次封装
    test_dataloader = DataLoader(dataset=new_test_dataset, batch_size=8,
                                  collate_fn=collate_fn, shuffle=True,
                                  drop_last=True)
    # 第三步:加载训练好的模型
    my_model = MyModel().to(device)
    my_model.load_state_dict(torch.load('./save_model/ai_fill_mask_3.bin'))

    # 第四步:定义测试的超参数
    total = 0 # 计算已经迭代样本
    acc_num = 0 # 计算已经预测正确的样本的个数
    # 注意,把模型设置为eval()
    my_model.eval()
    # 第五步:开始测试
    for idx, (input_ids, token_type_ids, attention_mask, labels_y) in enumerate(tqdm(test_dataloader), start=1):
        # print(f'input_ids--》{input_ids}')
        input_ids = input_ids.to(device)
        token_type_ids = token_type_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels_y = labels_y.to(device)
        with torch.no_grad():
            output = my_model(input_ids, token_type_ids, attention_mask)

        # 计算预测正确的样本个数
        predicts = torch.argmax(output, dim=-1)
        acc_num = acc_num + (predicts == labels_y).sum().item()
        total = total + len(labels_y)
        # 每个5步,打印一下平均准确率,并且取出一个批次的第一个样本,进行结果的展示
        print(f'')
        if idx % 5 == 0:
            print(f'平均准确率:{acc_num/total}', )
            print(f'取出一个样本:{bert_tokenizer.decode(input_ids[0])}',)
            print(f'预测值:{bert_tokenizer.decode(predicts[0])}, 真实值:{bert_tokenizer.decode(labels_y[0])}')
            print('*'*80)


if __name__ == '__main__':
    # model2train()
    model2test()

可选的超参数建议:

Batch size: 16, 32
Learning rate (Adam): 5e-5, 3e-5, 2e-5
Epochs: 3, 4

BERT模型两大预训练任务

Masked LM

  • 在原始训练文本中, 随机的抽取15%的token作为参与MASK任务的对象
  • 在这些被选中的token中, 数据生成器并不是把它们全部变成[MASK]
    • 在80%的概率下, 用[MASK]标记替换该token
    • 在10%的概率下, 用一个随机的单词替换token
    • 在10%的概率下, 保持该token不变
  • BERT的MLM任务为什么采用811的策略?
    • 1. 采用811可以促进mask对自身的学习
    • 2. 插入10% 的随机token,可以提高模型学习的鲁棒性,相当于加入噪声

Next Sentence Prediction

  • 句子对分类任务 Sentence Pair Classification Tasks
  • 输入:[CLS] Sentence1 [SEP] Sentence2 [SEP]
  • 输出:使用 [CLS] token 的表示进行分类

BERT优缺点

  • 优点
    • 预训练+微调,效果好
    • 基于transformer支持并行运算
    • encoder架构实现双向理解
  • 缺点
    • 模型参数大,臃肿
    • MLM只有15%的token是[MASK],收敛慢
    • [MASK]只在训练时出现,训练推理信息有偏差

BERT处理长文本的方法

  • BERT预训练模型接受的最大sequence 长度是 512
  • 长度超过512怎么处理呢?
    • 1.head only 掐头 ,从头开始掐,只保存510个token,留俩给cls和sep
    • 2.tail only 去尾 ,从尾开始掐,只保存510个token,留俩给cls和sep
    • 3.head + tail 掐头 去尾 比如head去128token,tail取382token,留俩给cls和sep
    • 4.分段再融合 600转换为2*300,分段转词向量再融合
    • 5.摘要再处理,将长文本提取成低于512长度的摘要

BERT系列模型

1. ALBERT (A Lite BERT)

核心优化:

  • 词嵌入因式分解:768维→128维映射,参数量从2300万降至48万

    • 30000*768 = 30000*128+128*768

  • 参数共享:所有Encoder层共享参数,参数量降至1/12

  • SOP替代NSP:预测句子顺序(正序 vs 倒序),增强语义理解

  • 去掉Dropout:大batch下不过拟合,效果反升

  • MLM优化:长句子训练 + N-gram预测

特点:轻量级,参数量小,训练快,精度损失小


2. RoBERTa (Robustly Optimized BERT) 稳健优化BERT  大力出奇迹

核心优化:

  • 更大数据:16GB → 160GB

  • 更大batch:256 → 8000

  • 更长训练:更多步数

  • 去掉NSP:证明NSP有害,改用连续句子

  • 动态Mask:每次训练动态生成mask,而非预处理固定

  • Byte-level BPE:词表3万→5万

特点:暴力优化,数据+算力堆砌,效果超越BERT


3. MacBERT (中文优化)

核心优化:

  • 全词Mask + N-gram:1-4字比例40%/30%/20%/10%

  • 相似词替换:用近义词替代[MASK](80%),减少训练推理不一致

  • SOP替代NSP:借鉴ALBERT

特点:专为中文设计,阅读理解任务表现优异


4. SpanBERT

核心优化:

  • Span Masking:连续片段遮掩(平均长度3.8),而非单字

  • SBO任务:用Span边界词向量预测内部词,增强边界表示

  • 去掉NSP:单句长文本训练更好

特点:擅长抽取式问答、实体识别等Span级任务


快速对比表

模型 核心改进 特点
ALBERT 参数共享+因式分解 轻量、快速
RoBERTa 更多数据+更大batch 暴力优化
MacBERT 全词Mask+近义词替换 中文优化
SpanBERT Span Mask+SBO 擅长Span任务

ELMo 模型

特点

上下文相关的动态词向量:根据上下文动态调整词向量,解决多义词问题

ELMo首次实现上下文感知的动态词向量,用双向LSTM解决多义词问题,但特征提取能力弱于后续的Transformer-based模型(BERT)。


GPT 模型

特点

单向语言模型:只用上文预测当前词,擅长生成任务(NLG)

  • 基础架构:Transformer Decoder(仅保留Masked Attention + FFN)

  • Decoder Block:去掉encoder-decoder attention层,12层堆叠

  • 注意力机制因果掩码(look-ahead mask),防止看到未来信息


最终结构

GPT 的 Decoder Block = 2个子层(各带残差连接和层归一化):

  • 子层1:Masked Multi-Head Self-Attention → Add & Norm

  • 子层2:Feed Forward Network → Add & Norm

12个这样的Block堆叠形成GPT!✅


与BERT对比

对比项 GPT BERT
架构 Decoder Encoder
注意力 单向(只看上文) 双向(看上下文)
擅长任务 生成(NLG) 理解(NLU)
预训练 自回归语言模型 MLM + NSP

优缺点

优点 缺点
✅ 擅长文本生成 ❌ 无法利用下文信息
✅ 自回归方式天然适合生成 ❌ NLU任务弱于BERT
✅ GPT-2/3扩大规模后效果惊人 ❌ 单向信息限制理解能力

GPT是单向Transformer Decoder模型,通过自回归方式预训练,擅长自然语言生成(NLG),与擅长理解的BERT形成互补。

BERT、GPT、ELMo 对比总结

核心差异

对比维度 ELMo GPT BERT
特征提取器 双向LSTM Transformer Decoder Transformer Encoder
语言模型 伪双向(左右拼接) 单向(只看上文) 真双向(看上下文)
融合方式 向量拼接 单向自回归 一体化双向注意力
擅长任务 通用(解决多义词) NLG(生成) NLU(理解)

详细对比

1. 特征提取能力

  • ELMo:LSTM → 能力弱于Transformer

  • GPT:Transformer Decoder → 能力强,但单向

  • BERT:Transformer Encoder → 能力最强,双向

2. 双向性

  • ELMo:左LSTM + 右LSTM → 特征拼接,不是真正融合

  • GPT:Masked Attention → 只看上文

  • BERT:全注意力 → 上下文都看

3. 架构来源

  • GPT:Decoder(删掉Encoder-Decoder Attention)

  • BERT:Encoder(只用Padding Mask)

  • ELMo:独立LSTM架构


优缺点

模型 优点 缺点
ELMo ✅ 解决多义词
✅ 上下文动态词向量
❌ LSTM能力弱
❌ 拼接融合差
GPT ✅ Transformer强大
✅ 擅长生成任务
❌ 单向,缺下文信息
BERT ✅ 双向Transformer
✅ MLM+NSP多任务
✅ 理解能力强
❌ 模型庞大
❌ 不适合生成

总结

  • ELMo:LSTM + 伪双向,解决多义词,但能力有限

  • GPT:Decoder + 单向,擅长生成(NLG)

  • BERT:Encoder + 真双向,擅长理解(NLU)

能力排序:BERT > GPT > ELMo(理解任务)
生成任务:GPT > BERT > ELMo

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐