一、序列模型

1.1 为什么需要“序列模型”?

以前我们做图像分类(比如识别猫和狗),第一张图是猫,第二张图是狗,这两张图之间是没有关系的。这叫“独立同分布”。
但在现实中,很多数据是有顺序、有前后依赖的:

  • 文本:“狗咬人”和“人咬狗”字一样,顺序不同,意思完全相反。
  • 股票:今天的股价一定和昨天的股价有关联。
  • 电影评分:随着时间推移,大家的审美会变(教材里举了锚定效应、季节性的例子)。

这种**“时间顺序极其重要”的数据,就叫序列数据**。处理这类数据的模型,就是序列模型

1.2 统计工具

../_images/ftse100.png

假设 x t x_t xt 代表第 t t t 天的股票价格。
教材中说,交易员想预测第 t t t 天的价格 x t x_t xt,他手上的筹码是过去所有的价格: x t − 1 , x t − 2 , … , x 1 x_{t-1}, x_{t-2}, \dots, x_1 xt1,xt2,,x1
在概率学上,这写成:

x t ∼ P ( x t ∣ x t − 1 , … , x 1 ) x_t \sim P(x_t \mid x_{t-1}, \ldots, x_1) xtP(xtxt1,,x1)
:在已知第 1 天到第 t − 1 t-1 t1 天价格的前提下,第 t t t 天价格的概率分布。

这里有一个致命的问题:
如果你要预测第3天,你需要输入前2天的数据;如果你要预测第100天,你需要输入前99天的数据。
输入的数据量是不断膨胀的。 但是,我们平时用的神经网络(比如全连接层 MLP),它的输入端个数必须是固定的(比如固定输入4个特征)。输入长度一直在变,网络没法训练。

为了解开这个死结,科学家提出了两种策略:自回归模型和隐变量自回归模型

1.3 自回归模型(Autoregressive Models)

既然输入过去所有的日子太长了,那我就只看最近的几天不就行了?
假设我规定,预测今天,我只需要看过去的 τ \tau τ(比如 τ = 4 \tau=4 τ=4)天。
以前是看从 x 1 x_1 x1 x t − 1 x_{t-1} xt1,现在我只看 x t − 4 , x t − 3 , x t − 2 , x t − 1 x_{t-4}, x_{t-3}, x_{t-2}, x_{t-1} xt4,xt3,xt2,xt1

  • 好处:不管现在是第几天,我的输入永远只有 4 个数字,神经网络立刻就能训练了
  • 为什么叫“自回归”?:因为它是用自己过去的数据,来预测自己未来的数据(自己对自己做回归预测)。

1.4 隐变量自回归模型(Latent Autoregressive Models)

有些人觉得,策略一直接把太久远的数据扔掉,太可惜了。万一100天前发生了金融危机,对今天还有影响呢?
于是有了隐变量自回归模型:我不保留所有原始数据,但我在心里保留一个对过去的“总结”,记作 h t h_t ht(这就是隐变量,Hidden state)。

它的工作流程是这样的:

在这里插入图片描述

  1. 昨天结束时,我脑子里有一个昨天的总结 h t − 1 h_{t-1} ht1
  2. 今天股市收盘,我看到了今天的真实价格 x t − 1 x_{t-1} xt1
  3. 我把“昨天的总结 h t − 1 h_{t-1} ht1”加上“今天的价格 x t − 1 x_{t-1} xt1”融合在一起,在脑子里更新成**“今天的最新总结 h t h_t ht”**。(公式: h t = g ( h t − 1 , x t − 1 ) h_t = g(h_{t-1}, x_{t-1}) ht=g(ht1,xt1)
  4. 明天我要预测股价 x ^ t \hat{x}_t x^t 时,我只需要看我脑子里的最新总结 h t h_t ht 就行了。(公式: x ^ t = P ( x t ∣ h t ) \hat{x}_t = P(x_t \mid h_{t}) x^t=P(xtht)
  • :这就是的**RNN(循环神经网络)**的核心灵魂

1.5 马尔可夫模型 与 公式

马尔可夫模型其实就是自回归模型的一个极端情况。
如果你觉得预测明天,只需要看今天( τ = 1 \tau=1 τ=1),前天和大前天都不重要,这就叫一阶马尔可夫模型

在这里插入图片描述

当假设仅是离散值时,这样的模型特别棒, 因为在这种情况下,使用动态规划可以沿着马尔可夫链精确地计算结果。 例如,我们可以高效地计算:

在只能看到“今天”的前提下,我怎么跨步去预测“后天”?

假设: x t − 1 x_{t-1} xt1 是周一, x t x_t xt 是周二, x t + 1 x_{t+1} xt+1 是周三。
我们已知周一的天气,想预测周三的天气,即求: P ( x t + 1 ∣ x t − 1 ) P(x_{t+1} \mid x_{t-1}) P(xt+1xt1)
P ( x t + 1 ∣ x t − 1 ) = ∑ x t P ( x t + 1 , x t , x t − 1 ) P ( x t − 1 ) = ∑ x t P ( x t + 1 ∣ x t , x t − 1 ) P ( x t , x t − 1 ) P ( x t − 1 ) = ∑ x t P ( x t + 1 ∣ x t ) P ( x t ∣ x t − 1 ) \begin{aligned} P(x_{t+1} \mid x_{t-1}) &= \frac{\sum_{x_t} P(x_{t+1}, x_t, x_{t-1})}{P(x_{t-1})} \quad \text{} \\ \\ &= \frac{\sum_{x_t} P(x_{t+1} \mid x_t, x_{t-1}) P(x_t, x_{t-1})}{P(x_{t-1})} \quad \text{} \\ \\ &= \sum_{x_t} P(x_{t+1} \mid x_t) P(x_t \mid x_{t-1}) \quad \text{} \end{aligned} P(xt+1xt1)=P(xt1)xtP(xt+1,xt,xt1)=P(xt1)xtP(xt+1xt,xt1)P(xt,xt1)=xtP(xt+1xt)P(xtxt1)

1.6 代码练习

目标:生成一个带噪声的正弦波序列,然后用多层感知机(MLP)通过“自回归”的方式去预测它。

1. 生成序列数据

我们先生成一个长度为1000的时间序列数据,公式为: x = sin ⁡ ( 0.01 × t ) + 噪声 x = \sin(0.01 \times t) + \text{噪声} x=sin(0.01×t)+噪声

import torch
from torch import nn
from d2l import torch as d2l

T = 1000  # 总共产生1000个时间步的数据点
time = d2l.arange(1, T + 1, dtype=d2l.float32) # 生成时间轴:1, 2, ..., 1000
# 生成x: 正弦函数加上一个均值为0,标准差为0.2的正态分布噪声
x = d2l.sin(0.01 * time) + d2l.normal(0, 0.2, (T,)) 

# 画图展示生成的数据
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))

加入噪声是为了模拟真实世界数据的不确定性。如果没有噪声,模型很容易完美拟合。

2. 构造特征-标签对(最关键的数据处理步)

我们采用自回归模型,设时间窗口 τ = 4 \tau = 4 τ=4。意味着我们要用前4个时刻的数据,预测第5个时刻的数据。

  • 特征 X X X [ x t − 4 , x t − 3 , x t − 2 , x t − 1 ] [x_{t-4}, x_{t-3}, x_{t-2}, x_{t-1}] [xt4,xt3,xt2,xt1]
  • 标签 Y Y Y x t x_t xt
tau = 4 # 嵌入维度(滑动窗口大小)
# 初始化特征矩阵,形状为 (996, 4)。因为前4个点没有足够的历史数据,所以样本数是 T - tau
features = torch.zeros((T - tau, tau)) 

# 核心逻辑:滑动窗口构造特征
for i in range(tau):
    # 第 i 列的数据,是原序列从 i 开始,到 T - tau + i 结束的切片
    # 比如 i=0 时,取 x[0:996];i=1 时,取 x[1:997]...
    # 这样 features 的每一行就自动变成了连续的4个数据点
    features[:, i] = x[i: T - tau + i]

# 标签矩阵,就是从第5个点开始的所有点,形状变为 (996, 1)
labels = x[tau:].reshape((-1, 1)) 

batch_size, n_train = 16, 600
# 我们只拿前 600 个样本做训练。剩下的留着测试模型的“外推”能力(预测未来)
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),
                            batch_size, is_train=True)

举个例子:假设原数据是 [1, 2, 3, 4, 5, 6, 7] τ = 4 \tau=4 τ=4
构造出的特征矩阵第一行是 [1, 2, 3, 4],标签是 5
第二行是 [2, 3, 4, 5],标签是 6。以此类推。

3. 定义模型与损失函数

用一个非常简单的两层全连接神经网络(MLP)。

# 初始化网络权重的函数 (Xavier初始化)
def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)

# 一个简单的多层感知机
def get_net():
    net = nn.Sequential(
        nn.Linear(4, 10), # 输入维度4(过去4个时间步),隐藏层10个神经元
        nn.ReLU(),        # 激活函数
        nn.Linear(10, 1)  # 输出维度1(预测下一个时间步的值)
    )
    net.apply(init_weights) # 应用初始化
    return net

# 平方损失(MSE)。reduction='none' 表示返回每个样本的损失值,不求平均
loss = nn.MSELoss(reduction='none') 

4. 训练模型

标准的 PyTorch 训练循环。

def train(net, train_iter, loss, epochs, lr):
    # 使用 Adam 优化器
    trainer = torch.optim.Adam(net.parameters(), lr)
    for epoch in range(epochs):
        for X, y in train_iter:
            trainer.zero_grad() # 梯度清零
            l = loss(net(X), y) # 前向传播计算损失
            l.sum().backward()  # 反向传播计算梯度(这里sum是因为前面loss是none,需要标量才能backward)
            trainer.step()      # 更新参数
        # 打印每个 epoch 的损失
        print(f'epoch {epoch + 1}, '
              f'loss: {d2l.evaluate_loss(net, train_iter, loss):f}')

net = get_net()
train(net, train_iter, loss, 5, 0.01) # 训练5个epoch,学习率0.01
epoch 1, loss: 0.059632
epoch 2, loss: 0.051356
epoch 3, loss: 0.050105
epoch 4, loss: 0.049160
epoch 5, loss: 0.048303

5. 单步预测(One-step Prediction)

模型训练好了,我们来看看它在整个数据集上的表现。
单步预测的意思是:我们永远使用真实的观测数据作为输入来预测下一步。

# 把所有的特征(包括训练集和测试集的历史真实数据)放进网络
onestep_preds = net(features)

# 画图对比
d2l.plot([time, time[tau:]], 
         [x.detach().numpy(), onestep_preds.detach().numpy()], 'time',
         'x', legend=['data', '1-step preds'], xlim=[1, 1000], 
         figsize=(6, 3))

预测和蓝线真实几乎重合,效果很好。
原因:因为即使在600步之后(没训练过的数据),我们预测 x 605 x_{605} x605 时,依然输入的是真实 x 601 , x 602 , x 603 , x 604 x_{601}, x_{602}, x_{603}, x_{604} x601,x602,x603,x604。这属于一种简单的“模式匹配”。

在这里插入图片描述

6. 多步预测 / 外推(Multi-step Prediction)

在真实世界预测未来时(比如炒股),到了第604天之后,你并没有第605天的真实数据。如果你想预测第606天,你只能拿你自己预测的第605天的数据作为输入填进去。这就是多步预测。

multistep_preds = torch.zeros(T)
# 前 600+4 个数据,我们用真实的(因为我们假装已经观测到了这么多)
multistep_preds[: n_train + tau] = x[: n_train + tau]

# 从第 604 步开始,往后预测到 1000 步
for i in range(n_train + tau, T):
    # multistep_preds[i - tau: i] 里面包含了模型刚刚预测出来的值,而不是真实观测值!
    X_input = multistep_preds[i - tau: i].reshape((1, -1))
    multistep_preds[i] = net(X_input)

# 画图查看效果
d2l.plot([time, time[tau:], time[n_train + tau:]],
         [x.detach().numpy(), onestep_preds.detach().numpy(),
          multistep_preds[n_train + tau:].detach().numpy()], 'time',
         'x', legend=['data', '1-step preds', 'multistep preds'],
         xlim=[1, 1000], figsize=(6, 3))

在这里插入图片描述

现象:绿线(多步预测)在刚开始几步还行,随后迅速衰减成一条直线。
核心原因:误差累积(Error Accumulation)

  • 第一步预测有一点误差 ϵ \epsilon ϵ
  • 第二步用带有误差的预测值作为输入,导致产生更大的误差。
  • 如此循环,误差呈指数级放大,最终模型完全丢失了信号,只能预测出一个常数(类似于数据集的均值)。这说明普通的 MLP 这种自回归方式做长远期预测是非常无力的。

一些练习

  1. 如果没有噪音,需要多少个过去的观测结果?提示:把 sin ⁡ \sin sin 写成微分方程。
    • :如果没有噪音, τ = 2 \tau=2 τ=2 就足够了。因为正弦波可以用二阶常微分方程 x ′ ′ ( t ) + ω 2 x ( t ) = 0 x''(t) + \omega^2 x(t) = 0 x′′(t)+ω2x(t)=0 描述。在离散时间下,可以用 x t , x t − 1 x_t, x_{t-1} xt,xt1 完美推出 x t + 1 x_{t+1} xt+1
  2. 投资者看过去的回报决定买哪个股票,策略会出什么问题?
    • :股票市场是极其复杂的“非平稳”系统(存在基本面变化、政策变化、突发新闻),且由于人类的博弈,过去的规律一旦被发现,马上就会失效。这违背了模型假设中的“序列动力学不改变(平稳性)”。
  3. 时间向前推进的因果模型如何适用于文本?
    • :非常适用。我们阅读和说话都是从左到右单向推进的。现在的 GPT(Generative Pre-trained Transformer)等大语言模型,本质上就是极其庞大的“自回归模型”:通过给定前面的所有词(上文),来预测下一个词。

二、文本预处理

如果你要教计算机认字,你不能直接把一本书塞给它,因为神经网络只认识数字(张量),不认识汉字或英文字母
因此,文本预处理的核心目的就是:把人类看的字符串,变成计算机能计算的数字序列。

整个过程分为标准的 四个步骤

  1. 加载数据:把文件读到内存里,做些简单清洗(去标点、转小写)。
  2. 词元化(Tokenization):把长句子切成一个个小词汇(词元)。
  3. 构建词表(Vocabulary):给每个词分配一个独一无二的“数字身份证”。
  4. 转换:对照词表,把所有文本变成一长串数字索引。

读取数据集

使用了著名科幻小说《时间机器》(The Time Machine)的英文原版。

import collections
from d2l import torch as d2l
import re

d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
                                '090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine():  #@save
    """将时间机器数据集加载到文本行的列表中"""
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])

深度解读核心清洗逻辑:

  • re.sub('[^A-Za-z]+', ' ', line):这是正则表达式。意思是把所有“不是(^)字母(A-Za-z)的字符”,全部替换成“空格( )”。这样所有的标点符号、数字都被去掉了。
  • .strip():去掉每一行开头和结尾的多余空格或换行符。
  • .lower():把所有大写字母转成小写。为什么这么做? 因为在计算机看来,Thethe 是两个完全不同的词,转成小写可以极大减少词表的数量。

词元化 / 分词

一句话变成了一个干净的字符串后,我们需要把它“切碎”。切碎的基本单位叫做词元(Token)
在英文中,通常有两种切法:

  1. Word(词级别)"the time machine" -> ['the', 'time', 'machine']
  2. Char(字符级别)"the" -> ['t', 'h', 'e']
def tokenize(lines, token='word'):  
    """将文本行拆分为单词或字符词元"""
    if token == 'word':
        # line.split() 默认按照空格把句子切成单词列表
        return [line.split() for line in lines]
    elif token == 'char':
        # list(line) 会把一个字符串直接拆解成单个字母的列表
        return [list(line) for line in lines]
    else:
        print('错误:未知词元类型:' + token)

# 测试运行
tokens = tokenize(lines) # 默认按单词切

运行后,原本的一整行字符串,变成了一个包含多个单词的列表(List of Lists)。

构建词表

现在我们有一堆单词(比如 ['the', 'time', 'machine', ...]),我们要给它们编号:'the'->1, 'time'->2
这个“单词-数字”对照本,就叫词表(Vocab)

词表有几个关键原则:

  1. 按出现频率排序:出现次数越多的词,编号越靠前。
  2. 处理生僻词:如果一个词只出现过1次(比如某些冷僻地名),把它加入词表会浪费内存。我们可以设置 min_freq(最小频率),出现次数低于这个值的词,统一当作 未知词元 <unk>(Unknown)。
  3. 保留词元:除了 <unk>,有时还需要 <pad>(句子不够长时用来补齐)、<bos>(句子开头)、<eos>(句子结束)。

count_corpus 统计词频函数

def count_corpus(tokens):  
    """统计词元的频率"""
    # 如果传进来的是二维列表(列表的列表),我们要把它“展平”成一维列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # [token for line in tokens for token in line] 就是展平操作
        tokens = [token for line in tokens for token in line]
        
    # collections.Counter 会自动统计列表中每个元素出现的次数,返回一个字典一样的东西
    # 比如:{'the': 200, 'time': 50, ...}
    return collections.Counter(tokens)

Vocab 词表类 (核心设计)

class Vocab:  
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        # 初始化处理
        if tokens is None: tokens = []
        if reserved_tokens is None: reserved_tokens = []
            
        # 1. 统计频率,并按照频率从高到低排序
        counter = count_corpus(tokens)
        self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
        
        # 2. 初始化两个核心字典(双向映射)
        # idx_to_token: 列表,通过索引找单词 (比如 idx_to_token[0] -> '<unk>')
        self.idx_to_token = ['<unk>'] + reserved_tokens
        # token_to_idx: 字典,通过单词找索引 (比如 token_to_idx['<unk>'] -> 0)
        self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
        
        # 3. 把高频词加入词表
        for token, freq in self._token_freqs:
            if freq < min_freq:  # 如果这个词出现的次数小于 min_freq,就不加了(后面的频率更低,直接break)
                break
            if token not in self.token_to_idx:
                # 把新词加进列表尾部
                self.idx_to_token.append(token)
                # 记录这个新词的索引(当前列表长度-1)
                self.token_to_idx[token] = len(self.idx_to_token) - 1

    # 【魔法方法】当使用 vocab['the'] 时会自动调用这个方法
    def __getitem__(self, tokens):
        # 如果传入的不是列表,说明查的是单个词
        if not isinstance(tokens, (list, tuple)):
            # 去字典里查,查不到就返回 self.unk (即 0)
            return self.token_to_idx.get(tokens, self.unk)
        # 如果传入的是列表,就用递归把每个词都查出来
        return [self.__getitem__(token) for token in tokens]

    # 给数字返回对应的单词
    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]

整合所有功能

现在,我们要把前面三步串起来,写一个最终函数,以后只要调用这个函数,就能直接拿到可以直接用于训练的数字序列了。

def load_corpus_time_machine(max_tokens=-1):  
    """返回时光机器数据集的词元索引列表和词表"""
    # 1. 读取清洗好的文本
    lines = read_time_machine()
    
    # 2. 按【字符】切分成词元
    tokens = tokenize(lines, 'char')
    
    # 3. 构建词表
    vocab = Vocab(tokens)
    
    # 4. 把所有的字符全部转化为数字索引,并且【展平】成一个极其长的一维列表
    # 为什么展平?因为我们要用序列模型处理,不需要保留一句话一句话的结构,只要时间线连贯即可
    corpus = [vocab[token] for line in tokens for token in line]
    
    # 5. 如果设置了最大截断,就只取前面一部分数据
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
        
    return corpus, vocab # 返回数字序列 和 词表对象

一些练习

  1. 尝试找到另外三种常用的词元化文本的方法。

    • 答:
      1. BPE (Byte Pair Encoding)WordPiece:这是目前大模型(如 GPT、BERT)最流行的方法叫作子词(Subword)词元化。它把 “playing” 切成 “play” 和 “##ing”。既避免了词表无限大,又保留了单词的意义。
      2. N-gram 分词:不仅仅切单个词,还把相邻的词组在一起(如 2-gram:“the time”, “time machine”),用于捕捉局部上下文。
      3. 中文的分词:jieba 分词。因为中文没有空格,必须依靠算法(基于词典或统计)把句子切成词语。
  2. 将文本词元为单词和更改Vocab实例的 min_freq 参数。这对词表大小有何影响?

    • 答:
      • 如果按“单词”切分,词表大小会激增(从 28 个字符暴增到几千个单词)。
      • 如果增大 min_freq(比如设为 5),那么出现次数小于 5 次的生僻词全都会被归为 <unk>(索引为0),词表的大小会显著减小。这在工程中非常常见,可以有效防止模型过拟合那些极其少见的词,也能节约显存。

三、语言模型

  • 什么是语言模型?为什么要用深度学习?
  • 如何把一本几万字的书,切成神经网络能吃进去的“小方块”?

3.1 语言模型理论与自然语言统计

1. 什么是语言模型?

在这里插入图片描述

简单来说,语言模型就是用来计算“一句话到底有多像人话”的概率模型。
比如:

  • P ( "狗咬人" ) = 0.01 P(\text{"狗咬人"}) = 0.01 P("狗咬人")=0.01
  • P ( "人咬狗" ) = 0.000001 P(\text{"人咬狗"}) = 0.000001 P("人咬狗")=0.000001
    因为“狗咬人”更符合现实规律,所以语言模型给它的概率更高。有了这个模型,我们就可以做文本生成:给定上半句“我想吃”,模型算出下一个字是“饭”的概率最高,于是就输出了“饭”。这就是 ChatGPT 的底层逻辑——不断预测下一个词(Next-token prediction)。

2. 传统方法的死胡同:N元语法(N-gram)与平滑

前面提过,我们要预测下一个词,最简单的办法就是数数(统计词频)
假设你想知道“deep”后面接“learning”的概率:
P ( learning ∣ deep ) = "deep learning"出现的次数 "deep"出现的次数 P(\text{learning} \mid \text{deep}) = \frac{\text{"deep learning"出现的次数}}{\text{"deep"出现的次数}} P(learningdeep)="deep"出现的次数"deep learning"出现的次数

  • 一元语法(Unigram):假设字和字之间没关系,只看单个字出现的概率。
  • 二元语法(Bigram):假设下一个字只跟前1个字有关(看2个字的组合)。
  • 三元语法(Trigram):假设下一个字跟前2个字有关(看3个字的组合)。

传统方法的致命缺点(数据稀疏问题):
在这里插入图片描述

如果你用三元语法甚至十元语法,你会发现大部分词组在书里根本没出现过,出现次数为 0,导致算出来的概率是 0。虽然可以用“拉普拉斯平滑”(给每个没出现过的词组偷偷加上一个微小的次数 ϵ \epsilon ϵ),但当词表变大时,内存根本存不下这么庞大的统计表,而且它完全不懂“词义”(不懂“猫”和“猫科动物”是相似的)。
结论:数数的方法行不通,必须上深度学习(神经网络)

3.2 自然语言统计

import random
import torch
from d2l import torch as d2l

tokens = d2l.tokenize(d2l.read_time_machine())
# 因为每个文本行不一定是一个句子或一个段落,因此我们把所有文本行拼接到一起
corpus = [token for line in tokens for token in line]
vocab = d2l.Vocab(corpus)
vocab.token_freqs[:10]
[('the', 2261),
 ('i', 1267),
 ('and', 1245),
 ('of', 1155),
 ('a', 816),
 ('to', 695),
 ('was', 552),
 ('in', 541),
 ('that', 443),
 ('my', 440)]

正如我们所看到的,最流行的词看起来很无聊, 这些词通常被称为停用词(stop words),因此可以被过滤掉。

尽管如此,它们本身仍然是有意义的,我们仍然会在模型中使用它们。 此外,还有个明显的问题是词频衰减的速度相当地快:

freqs = [freq for token, freq in vocab.token_freqs]
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)',
         xscale='log', yscale='log')

在这里插入图片描述

这意味着单词的频率满足齐普夫定律(Zipf’s law),即第i个最常用单词的频率 n i n_i ni为:
n i ∝ 1 i α n_i \propto \frac{1}{i^\alpha} niiα1
等价于
log ⁡ n i = − α log ⁡ i + c i \log{n_i} = -\alpha \log {i} + c_i logni=αlogi+ci
这告诉我们想要通过计数统计和平滑来建模单词是不可行的, 因为这样建模的结果会大大高估尾部单词的频率,也就是所谓的不常用单词。
其他的词元组合,比如二元语法、三元语法等等,又会如何呢?

我们来看看二元语法的频率是否与一元语法的频率表现出相同的行为方式。

bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = d2l.Vocab(bigram_tokens)
bigram_vocab.token_freqs[:10]
[(('of', 'the'), 309),
 (('in', 'the'), 169),
 (('i', 'had'), 130),
 (('i', 'was'), 112),
 (('and', 'the'), 109),
 (('the', 'time'), 102),
 (('it', 'was'), 99),
 (('to', 'the'), 85),
 (('as', 'i'), 78),
 (('of', 'a'), 73)]

在十个最频繁的词对中,有九个是由两个停用词组成的, 只有一个与“the time”有关。 我们再进一步看看三元语法的频率是否表现出相同的行为方式。

trigram_tokens = [triple for triple in zip(
    corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_tokens)
trigram_vocab.token_freqs[:10]
[(('the', 'time', 'traveller'), 59),
 (('the', 'time', 'machine'), 30),
 (('the', 'medical', 'man'), 24),
 (('it', 'seemed', 'to'), 16),
 (('it', 'was', 'a'), 15),
 (('here', 'and', 'there'), 15),
 (('seemed', 'to', 'me'), 14),
 (('i', 'did', 'not'), 14),
 (('i', 'saw', 'the'), 13),
 (('i', 'began', 'to'), 13)]

最后,我们直观地对比三种模型中的词元频率:一元语法、二元语法和三元语法。

bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',
         ylabel='frequency: n(x)', xscale='log', yscale='log',
         legend=['unigram', 'bigram', 'trigram'])

在这里插入图片描述

3.3 读取长序列数据

深度学习模型不能一次性把整本 3 万字的小说吞进去。模型需要的是固定形状的矩阵(比如 batch_size=2, num_steps=5,也就是一次吃 2 句长度为 5 的话)。
那么问题来了:如何把 3 万字的长文本,切成一段一段整齐的小文本放进模型里?

两种切法:随机采样(Random Sampling)顺序分区(Sequential Partitioning)

特征 X X X 和 标签 Y Y Y

在语言模型中,我们的目标是预测下一个词
所以,如果我们截取了一段长度为 5 的序列 [1, 2, 3, 4, 5]

  • 输入 X X X 就是前 4 个词:[1, 2, 3, 4]
  • 标签 Y Y Y 就是整体往后挪一个词:[2, 3, 4, 5]
    意思是:看到 1 预测 2,看到 1,2 预测 3… 依此类推。
3.3.1 随机采样

切法一:随机采样 (seq_data_iter_random)

核心思想: 把面条随便切成一段一段长度为 n n n 的小段,然后打乱顺序,每次随机抓几段组合成一个 Batch。

def seq_data_iter_random(corpus, batch_size, num_steps):  
    """使用随机抽样生成一个小批量子序列"""
    
    # 1. 随机偏移量:避免每次切分的结果都一样(每次切的时候起点稍微偏一点)
    # random.randint(0, num_steps - 1) 产生一个 0 到 4 的随机数
    corpus = corpus[random.randint(0, num_steps - 1):]
    
    # 2. 计算最多能切出多少个完整的子序列(减1是因为最后一个词要用来做标签 Y)
    num_subseqs = (len(corpus) - 1) // num_steps
    
    # 3. 记录每一个子序列的起始索引位置,比如 [0, 5, 10, 15, ...]
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    
    # 4. 【灵魂操作】打乱这些起始索引!这样模型就不会按顺序看书了
    random.shuffle(initial_indices)

    # 辅助函数:根据起始位置,取出长度为 num_steps 的数据
    def data(pos):
        return corpus[pos: pos + num_steps]

    # 5. 计算一共能组成多少个 Batch
    num_batches = num_subseqs // batch_size
    
    # 6. 开始生成 Batch
    for i in range(0, batch_size * num_batches, batch_size):
        # 拿出一个 batch_size 大小的随机起始索引集合
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        
        # 根据这些索引提取 X。比如索引是 10,X 就是 corpus[10:15]
        X = [data(j) for j in initial_indices_per_batch]
        # Y 是 X 在时间线上往后移一步。所以起始索引是 j+1,Y 就是 corpus[11:16]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        
        # 使用 yield 将其变成一个生成器,每次迭代返回一对 (X, Y) 的矩阵
        yield np.array(X), np.array(Y)

随机采样的特点:
因为顺序被打乱了,所以第一个 Batch 的结尾,和第二个 Batch 的开头,在原文中是连不上的。
因此,在训练网络时,每次换一个新 Batch,我们都必须把网络脑子里暂存的记忆(隐藏状态)清零

生成一个0~34的序列:

my_seq = list(range(35))
for X, Y in seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
    print('X: ', X, '\nY:', Y)
X:  tensor([[13, 14, 15, 16, 17],
        [28, 29, 30, 31, 32]])
Y: tensor([[14, 15, 16, 17, 18],
        [29, 30, 31, 32, 33]])
X:  tensor([[ 3,  4,  5,  6,  7],
        [18, 19, 20, 21, 22]])
Y: tensor([[ 4,  5,  6,  7,  8],
        [19, 20, 21, 22, 23]])
X:  tensor([[ 8,  9, 10, 11, 12],
        [23, 24, 25, 26, 27]])
Y: tensor([[ 9, 10, 11, 12, 13],
        [24, 25, 26, 27, 28]])
3.3.2 顺序分区

切法二:顺序分区 (seq_data_iter_sequential) —— 【工业界最常用】

核心思想: 如果有 3 万字,Batch size 是 3。我们先把 3 万字切成 3 个长条(每条 1 万字)。
然后模型同时看这 3 个长条:
第一眼看 3 个长条的第 1-5 个字;
第二眼看 3 个长条的第 6-10 个字…
这样保证了在同一个位置上,当前的 Batch 和上一个 Batch 在原文里是绝对首尾相连的!

def seq_data_iter_sequential(corpus, batch_size, num_steps): 
    """使用顺序分区生成一个小批量子序列"""
    
    # 1. 依然是随机偏移一点,保证不同 Epoch 切的位置不同
    offset = random.randint(0, num_steps)
    
    # 2. 计算一共能保留多少个词(必须能被 batch_size 整除,多余的扔掉)
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    
    # 3. 截取 X 和 Y 的数据范围(Y 就是 X 整体往后错 1 位)
    Xs = np.array(corpus[offset: offset + num_tokens])
    Ys = np.array(corpus[offset + 1: offset + 1 + num_tokens])
    
    # 4. 【灵魂操作:折叠】将一根长面条,折叠成 batch_size 这么多行。
    # -1 表示列数自动计算。
    # 例如:原序列 [0,1,2,3,4,5], batch_size=2
    # reshape后变成:
    # 行1: [0, 1, 2]
    # 行2: [3, 4, 5]
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    
    # 5. 计算可以滑动多少个窗口(列数除以窗口长度 num_steps)
    num_batches = Xs.shape[1] // num_steps
    
    # 6. 从左到右,一截一截地切取
    for i in range(0, num_steps * num_batches, num_steps):
        # 取所有行,列取从 i 到 i+num_steps
        X = Xs[:, i: i + num_steps]
        Y = Ys[:, i: i + num_steps]
        yield X, Y

根据输出,我们发现每次输出都接上了上一个batch的末尾

X:  tensor([[13, 14, 15, 16, 17],
        [28, 29, 30, 31, 32]])
Y: tensor([[14, 15, 16, 17, 18],
        [29, 30, 31, 32, 33]])
X:  tensor([[ 3,  4,  5,  6,  7],
        [18, 19, 20, 21, 22]])
Y: tensor([[ 4,  5,  6,  7,  8],
        [19, 20, 21, 22, 23]])
X:  tensor([[ 8,  9, 10, 11, 12],
        [23, 24, 25, 26, 27]])
Y: tensor([[ 9, 10, 11, 12, 13],
        [24, 25, 26, 27, 28]])

顺序分区的特点:
因为每个 Batch 都完美接续上一个 Batch 的内容。所以在训练时,我们可以保留神经网络上一刻的记忆(隐藏状态)直接传给下一个 Batch! 这让网络能捕捉到非常非常长的历史信息。现在的 GPT 等大模型,底层数据喂养方式全都是这种。

四、RNN

4.1 为什么需要 RNN?

在上一节中,我们尝试用统计词频(N元语法)来预测下一个词。
但遇到了一个死结:如果你想让模型考虑前面 10 个词的上下文,你需要统计所有的 10 词组合。 这个组合数量是指数级爆炸的,全世界的硬盘都存不下,而且绝大多数组合根本没在书里出现过(数据稀疏)。

在这里插入图片描述

在这里插入图片描述

怎么解决?—— 引入“记忆”(隐状态,Hidden State)
与其死板地记录前面所有词的精确组合,不如在神经网络的脑子里开辟一块“记忆区”
模型每读一个字,就把这个字的信息融进“记忆”里。这样,预测下一个字的时候,模型只需要看一眼当前的“记忆”就行了。

基于循环计算的隐状态神经网络被命名为 循环神经网络(recurrent neural network)。 在循环神经网络中执行上面的计算的层 称为循环层(recurrent layer)。

4.2 RNN 的原理

1. 没有记忆的普通网络(MLP)

回想一下普通的多层感知机

在这里插入图片描述
输入 X X X,乘上权重矩阵 W x h W_{xh} Wxh,加上偏置 b h b_h bh,通过激活函数变成隐藏层 H H H。然后再把 H H H 变成输出 O O O在这里插入图片描述

  • 致命弱点:它处理“The”、“time”、“machine”这三个词时,是完全独立的。看完“The”就全忘了,再看“time”时,根本不知道前面有个“The”。

2. 有记忆的循环神经网络(RNN)

RNN 仅仅在 MLP 的基础上,加了一个加法

H t = ϕ ( X t W x h + H t − 1 W h h + b h ) H_t = \phi(X_t W_{xh} + H_{t-1} W_{hh} + b_h) Ht=ϕ(XtWxh+Ht1Whh+bh)

  • X t W x h X_t W_{xh} XtWxh:我**现在(时间 t t t)**眼睛看到的词(比如“time”)。
  • H t − 1 W h h H_{t-1} W_{hh} Ht1Whh:我**上一刻(时间 t − 1 t-1 t1)**脑子里的记忆(包含了前面看到的“The”)。
  • 把“现在的刺激”和“过去的记忆”加在一起,经过激活函数 ϕ \phi ϕ,形成了最新一刻的记忆 H t H_t Ht

有了最新的记忆 H t H_t Ht,我就可以用它来输出(预测下一个词):
O t = H t W h q + b q O_t = H_t W_{hq} + b_q Ot=HtWhq+bq

RNN 的三个特性:

  1. 循环更新:每一步的记忆 H t H_t Ht 都传给下一步,形成了一个链条。
  2. 捕捉无限历史:理论上, H t H_t Ht 里面包含了从时间步 1 到 t t t 所有的信息。
  3. 参数共享:不管这个句子有 10 个词还是 100 个词,它自始至终只用同一套参数 W x h , W h h , W h q W_{xh}, W_{hh}, W_{hq} Wxh,Whh,Whq)。这使得模型的参数量不会随句子变长而爆炸

4.4 困惑度(perplexity)

衡量一个语言模型的好坏可以用平均交叉熵
π = 1 n ∑ i = 1 n − log ⁡ p ( x t ∣ x t − 1 , … ) \pi = \frac{1}{n} \sum_{i=1}^{n} - \log{p(x_t|x_{t-1},\dots}) π=n1i=1nlogp(xtxt1,)

  • p是语言模型的预测概率, x t x_t xt是真实词
  • 历史原因NLP使用困惑度 e x p ( π ) exp(\pi) exp(π)来衡量,是平均每次可能选项
    • 1表示完美,无穷大是最差情况

4.5 梯度裁剪

  • 迭代中计算这 T 个时间步上的梯度,在反向传播过程中产生长度为 O(T) 的矩阵乘法链,导致数值不稳定

  • 梯度裁剪能有效预防梯度爆炸

    • 如果梯度长度超过 θ,那么拖影回长度θ

      • g ← m i n ( 1 , θ ∣ ∣ g ∣ ∣ ) g g \leftarrow min(1, \frac{\theta}{||g||})g gmin(1,∣∣g∣∣θ)g

4.6 从零开始实现

从零开始(使用最基础的张量操作,不使用框架自带的RNN层)实现一个循环神经网络(RNN),并用它来训练一个字符级语言模型(Character-Level Language Model)。

导入相关的库,并加载在上一节中处理好的《时光机器》数据集。

%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
# 加载数据集,返回数据迭代器和词表
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
4.6.1 独热编码 (One-Hot Encoding)

概念
我们的数据集中,每个字符(词元)已经被转换成了整数索引(比如 ‘a’ 是 1,‘b’ 是 2)。但是直接把整数输入给神经网络是不合理的,因为整数之间有大小关系(2比1大),但这并不意味着 ‘b’ 比 ‘a’ 大。
因此,我们使用独热编码。假设词表大小为28,索引为2的字符会被表示为一个长度为28的向量,其中第2个位置是1,其余位置全是0。

# 测试一下 PyTorch 的 one_hot 函数
F.one_hot(torch.tensor([0, 2]), len(vocab))
array([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])

维度转换
我们每次从迭代器拿到的数据 X 形状是 (批量大小 batch_size, 时间步数 num_steps)
为了让 RNN 能够一步一步地(按时间步)处理数据,我们需要将数据转置,使其形状变成 (时间步数, 批量大小)。对其进行独热编码后,形状最终变为 (时间步数, 批量大小, 词表大小)

X = torch.arange(10).reshape((2, 5)) # 模拟 batch_size=2, num_steps=5
# X.T 将形状变为 (5, 2)。然后再 one_hot。
print(F.one_hot(X.T, 28).shape) 
(5, 2, 28)
4.6.2 初始化模型参数

概念
我们需要为 RNN 定义可以训练的权重和偏置。根据 RNN 的核心公式:
H t = ϕ ( X t W x h + H t − 1 W h h + b h ) H_t = \phi(X_t W_{xh} + H_{t-1} W_{hh} + b_h) Ht=ϕ(XtWxh+Ht1Whh+bh)
O t = H t W h q + b q O_t = H_t W_{hq} + b_q Ot=HtWhq+bq
我们需要初始化 5 个参数: W x h , W h h , b h W_{xh}, W_{hh}, b_h Wxh,Whh,bh (隐藏层参数) 和 W h q , b q W_{hq}, b_q Whq,bq (输出层参数)。

def get_params(vocab_size, num_hiddens, device):
    # 因为输入和输出都是字符(词表大小),所以输入和输出维度相同
    num_inputs = num_outputs = vocab_size

    # 定义一个辅助函数,用于生成均值为0,标准差为0.01的正态分布随机张量
    def normal(shape):
        return torch.randn(size=shape, device=device) * 0.01

    # 隐藏层参数
    W_xh = normal((num_inputs, num_hiddens)) # 输入到隐藏层的权重
    W_hh = normal((num_hiddens, num_hiddens)) # 上一时间步隐藏状态到当前隐藏状态的权重
    b_h = torch.zeros(num_hiddens, device=device) # 隐藏层的偏置

    # 输出层参数
    W_hq = normal((num_hiddens, num_outputs)) # 隐藏状态到输出的权重
    b_q = torch.zeros(num_outputs, device=device) # 输出层的偏置

    # 将所有参数放入一个列表中
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    
    # 逐个开启梯度计算(这是PyTorch中要求模型参数必须做的)
    for param in params:
        param.requires_grad_(True)
        
    return params
4.6.3 循环神经网络模型

概念
我们需要定义两个东西:

  1. 初始状态:在第一个时间步,没有前一个时间步的隐藏状态,所以我们要用全0初始化一个。
  2. 前向传播逻辑:按照时间步循环,计算每一个时刻的隐藏状态和输出。
# 1. 初始化隐藏状态
def init_rnn_state(batch_size, num_hiddens, device):
    # 返回一个元组,里面包含一个形状为 (批量大小, 隐藏单元数) 的全0张量
    # 为什么要用元组?为了以后扩展方便(比如LSTM需要两个隐藏状态:隐状态H和记忆细胞C)
    return (torch.zeros((batch_size, num_hiddens), device=device), )

# 2. 定义前向传播 (一个 batch 内的数据处理)
def rnn(inputs, state, params):
    # inputs的形状:(时间步数量, 批量大小, 词表大小)
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state      # 从元组中取出前一时刻的隐藏状态 H
    outputs = []    # 用于保存每个时间步的输出
    
    # 核心:循环遍历最外层的维度(时间步数量)
    # 每次取出的 X 形状为 (批量大小, 词表大小),代表当前时间步的所有样本数据
    for X in inputs:
        # 计算当前时刻的隐藏状态 H。使用 tanh 作为激活函数
        # torch.mm 是矩阵乘法
        # 双曲正切函数把值控制在 [-1, 1]
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        # 计算当前时刻的输出 Y
        Y = torch.mm(H, W_hq) + b_q
        # 把 Y 添加到输出列表中
        outputs.append(Y)
        
    # 把所有时间步的输出在第0维度拼接起来
    # 返回拼接后的输出 和 最后一个时间步的隐藏状态(作为下一个 batch 的初始状态)
    return torch.cat(outputs, dim=0), (H,)

为了方便使用,我们用一个类 (Class) 把上面的零散函数包装起来:

class RNNModelScratch: 
    """从零开始实现的循环神经网络模型"""
    def __init__(self, vocab_size, num_hiddens, device,
                 get_params, init_state, forward_fn):
        self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
        # 实例化参数
        self.params = get_params(vocab_size, num_hiddens, device)
        self.init_state, self.forward_fn = init_state, forward_fn

    # 定义 __call__ 使得类实例可以像函数一样被调用 (例如 net(X, state))
    def __call__(self, X, state):
        # 1. 把输入的整数索引转置并转为独热编码,并转为浮点型
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
        # 2. 调用 rnn 函数进行前向传播
        return self.forward_fn(X, state, self.params)

    # 封装初始化状态的方法
    def begin_state(self, batch_size, device):
        return self.init_state(batch_size, self.num_hiddens, device)
4.6.4 预测

概念
语言模型的预测是自回归的。用户给一个前缀(比如 "time traveler "),模型先通过吃掉这个前缀来更新自己的隐藏状态(这叫预热期 Warm-up,此期间我们不关心输出)。预热结束后,把上一步的正确字符(或预测结果)喂给模型,让它输出下一个字符,如此循环。

def predict_ch8(prefix, num_preds, net, vocab, device):  
    """在prefix后面生成新字符"""
    # 预测时,batch_size 是 1
    state = net.begin_state(batch_size=1, device=device)
    # outputs 列表用于保存生成的字符索引,先把 prefix 的第一个字符放进去
    outputs = [vocab[prefix[0]]]
    
    # 这是一个辅助函数,每次将 outputs 的最后一个字符提取出来,变形成 (1, 1) 的形状喂给模型
    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
    
    # 1. 预热期 (Warm-up)
    for y in prefix[1:]:  
        # 输入前一个字符,更新状态。注意这里用 _ 忽略了输出,只保留更新后的 state
        _, state = net(get_input(), state)
        # 将给定的正确字符存入 outputs
        outputs.append(vocab[y])
        
    # 2. 预测期
    for _ in range(num_preds):  # 预测 num_preds 步
        y, state = net(get_input(), state)
        # y 是各个词的概率(logits),argmax 获取概率最大的那个词的索引
        outputs.append(int(y.argmax(dim=1).reshape(1)))
        
    # 将包含索引的 outputs 列表转换为字符字符串返回
    return ''.join([vocab.idx_to_token[i] for i in outputs])
4.6.5 梯度裁剪 (Gradient Clipping)

概念
由于 RNN 在时间步上展开,梯度会通过矩阵连乘进行反向传播。序列很长时,容易导致梯度爆炸(Gradient Explosion)
解决方案是:如果所有参数梯度的 L2 范数(模长)超过某个阈值 θ \theta θ,我们就把梯度等比例缩小,保证其范数等于 θ \theta θ。这样可以防止模型在某一步更新跨度过大导致崩溃。

def grad_clipping(net, theta):  
    """裁剪梯度"""
    # 兼容处理:如果是 nn.Module 构建的高级模型
    if isinstance(net, nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        # 如果是我们从零开始构建的模型
        params = net.params
        
    # 计算所有参数梯度的 L2 范数(先平方求和,再开根号)
    # torch.stack 将列表中的标量拼接成向量,然后 norm 求范数
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    
    # 如果范数大于阈值,则进行裁剪
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm
4.6.6 训练

概念
RNN 的训练由于其隐藏状态的延续性,比普通的网络复杂一点。
有两种数据划分策略(上一节提过):

  1. 顺序分区:下一个 batch 的数据是接着上一个 batch 的。所以隐藏状态可以传递。但为了防止计算图无限增长(反向传播到底),需要在每个 batch 前截断计算图(Detach)。
  2. 随机抽样:每个 batch 是完全独立的。所以每个 batch 必须重新初始化隐藏状态。

(训练一个Epoch)

def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
    """训练模型一个迭代周期"""
    state, timer = None, d2l.Timer()
    metric = d2l.Accumulator(2)  # 累加器:记录训练损失之和, 词元数量
    
    for X, Y in train_iter:
        # 如果状态为空,或者使用的是随机抽样(batch间不连续)
        if state is None or use_random_iter:
            # 重新初始化状态
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            # 如果是顺序抽样,需要将 state 从计算图中分离出来 (Truncated BPTT)
            # 否则反向传播会一直追溯到第一个 batch,导致内存爆炸
            for s in state:
                s.detach_()
                
        # 整理 Y 的形状。X 转置后是 (num_steps, batch_size),
        # 所以 Y 也先转置,然后拉平为一维向量,方便 CrossEntropyLoss 计算
        y = Y.T.reshape(-1)
        X, y = X.to(device), y.to(device)
        
        # 前向传播
        y_hat, state = net(X, state)
        # 计算损失 (y_hat: [num_steps*batch_size, vocab_size], y: [num_steps*batch_size])
        l = loss(y_hat, y.long()).mean()
        
        # 反向传播
        if isinstance(updater, torch.optim.Optimizer): # 如果用高级API
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1) # 梯度裁剪
            updater.step()
        else: # 如果用我们手写的 updater (d2l.sgd)
            l.backward()
            grad_clipping(net, 1) # 梯度裁剪
            # 因为我们的 loss 计算了均值,这里 batch_size 传 1 即可
            updater(batch_size=1) 
            
        metric.add(l * y.numel(), y.numel())
        
    # 返回:困惑度 (Perplexity), 词元处理速度 (tokens/sec)
    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()

整体训练函数

def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
              use_random_iter=False):
    """训练模型"""
    # 语言模型是一个多分类问题(预测下一个字是词表里的哪一个),所以用交叉熵损失
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
                            legend=['train'], xlim=[10, num_epochs])
    
    # 优化器设置
    if isinstance(net, nn.Module):
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        # 使用自定义的 SGD
        updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
        
    predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
    
    # 训练循环
    for epoch in range(num_epochs):
        ppl, speed = train_epoch_ch8(
            net, train_iter, loss, updater, device, use_random_iter)
        if (epoch + 1) % 10 == 0:
            print(predict('time traveller'))
            animator.add(epoch + 1, [ppl])
            
    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
    print(predict('time traveller'))
    print(predict('traveller'))

开始训练

num_epochs, lr = 500, 1

# 初始化模型
num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)

# 使用顺序抽样训练
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())
困惑度 1.0, 23960.0 词元/秒 gpu(0)
time traveller for so it will be convenient to speak of himwas e
travelleryou can show black is white by argument said filby

如果我们加上随机

net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu(),
          use_random_iter=True)
困惑度 1.5, 65222.3 词元/秒 cuda:0
time traveller held in his hand was a glitteringmetallic framewo
traveller but now you begin to seethe object of my investig

困惑度更高了

4.7 简洁实现

导入依赖与加载数据

import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

# 设定批量大小和时间步长
batch_size, num_steps = 32, 35
# 加载《时光机器》数据集,获取数据迭代器和词汇表
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

定义模型

PyTorch 中的 nn.RNN 已经帮我们把上一节中循环遍历时间步(for X in inputs:)以及计算隐藏状态公式( H t = tanh ⁡ ( X t W x h + H t − 1 W h h + b h ) H_t = \tanh(X_t W_{xh} + H_{t-1} W_{hh} + b_h) Ht=tanh(XtWxh+Ht1Whh+bh))封装好了。

num_hiddens = 256
# 实例化一个 RNN 层
# 第一个参数是输入维度(词表大小),第二个参数是隐藏单元数量
rnn_layer = nn.RNN(len(vocab), num_hiddens)

隐状态的形状:
在高级 API 中,隐状态张量的形状定义为:(隐藏层数, 批量大小, 隐藏单元数)
因为我们现在只用了一层单向 RNN,所以第一维是 1。

# 初始化隐状态,全为0
state = torch.zeros((1, batch_size, num_hiddens))
print(state.shape) # 输出: torch.Size([1, 32, 256])

测试前向传播,理解输出的含义

# 随机生成一个输入张量模拟独热编码后的数据
# 形状要求:(时间步数, 批量大小, 词表大小)
X = torch.rand(size=(num_steps, batch_size, len(vocab)))

# 将输入 X 和初始隐状态 state 送入 RNN 层
Y, state_new = rnn_layer(X, state)

print(Y.shape)         # 输出: torch.Size([35, 32, 256])
print(state_new.shape) # 输出: torch.Size([1, 32, 256])

极其重要的概念:

  • Y 不是我们最终需要的预测输出!在 PyTorch 中,RNN 层的输出 Y 指的是每一个时间步的隐状态的集合。它的形状是 (时间步数, 批量大小, 隐藏单元数)
  • state_new 指的是最后一个时间步的隐状态。如果你对比一下,Y[-1](Y的最后一行)和 state_new[0] 实际上是完全相同的。state_new 是用来传递给下一个 Batch 当初始状态的。
  • 因为 Y 只是隐藏状态,我们还需要在外面接一个全连接层(输出层),把形状为 256 的隐藏状态映射为长度为 28(词表大小)的概率分布,才能预测下一个字符!

构建完整的 RNN 模型类

既然 nn.RNN 只管隐状态计算,不管最终输出,我们就得自己写一个 nn.Module 把它们拼起来。

class RNNModel(nn.Module):
    """循环神经网络模型"""
    def __init__(self, rnn_layer, vocab_size, **kwargs):
        super(RNNModel, self).__init__(**kwargs)
        self.rnn = rnn_layer         # 传入刚才实例化的 nn.RNN 层
        self.vocab_size = vocab_size # 词表大小
        self.num_hiddens = self.rnn.hidden_size # 获取隐藏单元数
        
        # 判断是不是双向RNN (以后会学),这里是单向,所以 num_directions = 1
        if not self.rnn.bidirectional:
            self.num_directions = 1
            # 定义输出层(全连接层):输入特征是隐状态维度,输出特征是词表大小
            self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
        else:
            self.num_directions = 2
            self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

    def forward(self, inputs, state):
        # 1. 对输入进行转置 (batch_size, num_steps) -> (num_steps, batch_size)
        # 2. 转换为独热编码,并转为浮点型张量
        X = F.one_hot(inputs.T.long(), self.vocab_size)
        X = X.to(torch.float32)
        
        # 3. 送入 RNN 层计算。Y 包含所有时间步的隐状态
        Y, state = self.rnn(X, state)
        
        # 4. 计算最终输出(极其巧妙的一步)
        # Y 原本形状是 (时间步数, 批量大小, 隐藏单元数)
        # linear 层期望的输入是二维的 (样本数, 特征数)
        # 所以我们把前两维拉平,变成 (时间步数 * 批量大小, 隐藏单元数)
        # 这样就可以一次性让所有时间步的数据通过全连接层,极大提升计算速度!
        output = self.linear(Y.reshape((-1, Y.shape[-1])))
        
        # output 的最终形状:(时间步数 * 批量大小, 词表大小)
        return output, state

    def begin_state(self, device, batch_size=1):
        # 初始化隐状态的方法。这里做了一个兼容性处理。
        # 在第9章我们会学 GRU 和 LSTM。GRU 和普通 RNN 的隐状态是一个张量。
        if not isinstance(self.rnn, nn.LSTM):
            # 返回形状: (层数*方向数, 批量大小, 隐藏单元数) 的全0张量
            return  torch.zeros((self.num_directions * self.rnn.num_layers,
                                 batch_size, self.num_hiddens),
                                device=device)
        else:
            # LSTM 比较特殊,它的隐状态是由两个张量组成的元组:(隐状态 H, 记忆细胞 C)
            return (torch.zeros((
                self.num_directions * self.rnn.num_layers,
                batch_size, self.num_hiddens), device=device),
                    torch.zeros((
                        self.num_directions * self.rnn.num_layers,
                        batch_size, self.num_hiddens), device=device))

训练与预测

在训练之前,我们先用初始化的(未经训练的随机权重)模型跑一下预测看看。

device = d2l.try_gpu()
# 实例化完整的模型
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)

# 给定前缀 'time traveller' 预测接下来的 10 个字符
d2l.predict_ch8('time traveller', 10, net, vocab, device)
'time travellerbbabbkabyg'

开始训练:
由于我们复用了 8.5 节写好的 train_ch8 训练逻辑(里面包含了计算交叉熵损失、梯度裁剪、优化器步进等逻辑),这里只需要一行代码就能开始训练。

num_epochs, lr = 500, 1
# 调用上一节定义的训练函数
d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)

训练结果对比与分析:`

perplexity 1.3, 404413.8 tokens/sec on cuda:0
time travellerit would be remarkably convenient for the historia
travellery of il the hise fupt might and st was it loflers
  1. 困惑度 (Perplexity):达到了 1.3 左右,说明模型已经很好地学会了《时光机器》这本书的拼写规律。
  2. 速度 (Tokens/sec):注意这个指标,在上一节从零实现时,速度大概是 24,000 词元/秒。而使用 PyTorch 的 nn.RNN 后,速度飙升到了 404,413 词元/秒!**速度提升了近 16 倍!**这就是使用高级 API 的最大好处。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐