循环神经网络(RNN)

  • 核心思想:处理序列数据,通过隐藏状态 (ht)(h_t)(ht) 在时间步之间传递信息。
  • 标准 RNN 公式(ht=tanh⁡(Wxhxt+Whhht−1+bh)),(yt=Whyht+by)(h_t = \tanh(W_{xh}x_t + W_{hh}h_{t-1} + b_h)),(y_t = W_{hy}h_t + b_y)(ht=tanh(Wxhxt+Whhht1+bh))(yt=Whyht+by)
  • 痛点:普通 RNN 存在梯度消失/爆炸,难以捕捉长距离依赖。
  • 改进变体
    • LSTM:遗忘门、输入门、输出门控制信息流动。
    • GRU:更新门、重置门简化门控结构。
  • 训练方式:沿时间反向传播(BPTT),常结合截断以降低计算量。
  • 典型应用:文本分类、情感分析、语言模型、时间序列预测、机器翻译等。

案例一:LSTM 文本情感分类(IMDB 数据集)

任务:对电影评论进行二分类(正面/负面),使用词嵌入 + 双向 LSTM。

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
import numpy as np





from keras.datasets import imdb
from keras.preprocessing.sequence import pad_sequences

MAX_FEATURES = 10000      # 只保留最常见的 10000 个单词
MAX_LEN = 500             # 统一序列长度



# 加载数据,num_words 限制词表大小
(x_train_raw, y_train), (x_test_raw, y_test) = imdb.load_data(num_words=MAX_FEATURES)



# 将序列裁剪或填充到相同长度(不足补0,过长截断)
x_train = pad_sequences(x_train_raw, maxlen=MAX_LEN, padding='post', truncating='post')
x_test = pad_sequences(x_test_raw, maxlen=MAX_LEN, padding='post', truncating='post')

# -------------------- 2. 构建 PyTorch 数据集 --------------------
class IMDBDataset(Dataset):
    def __init__(self, data, labels):
        # data: numpy 数组,shape (样本数, 序列长度)
        # labels: numpy 数组,0/1 标签
        self.data = torch.tensor(data, dtype=torch.long)
        self.labels = torch.tensor(labels, dtype=torch.float32)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# 实例化数据集
train_dataset = IMDBDataset(x_train, y_train)
test_dataset = IMDBDataset(x_test, y_test)

# 数据加载器
BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# -------------------- 3. 定义模型:双向 LSTM 分类器 --------------------
class BiLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim=128, hidden_dim=128, num_layers=2, num_classes=1, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)  # padding_idx 使填充位置梯度为0
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers,
                            batch_first=True, bidirectional=True, dropout=dropout)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)      # 双向拼接 -> 2*hidden_dim
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # x shape: (batch, seq_len)
        embedded = self.embedding(x)               # (batch, seq_len, embed_dim)
        lstm_out, (hidden, cell) = self.lstm(embedded)  # lstm_out: (batch, seq_len, hidden_dim*2)
        # 取最后一个时间步的输出(也可使用最大池化)
        out = lstm_out[:, -1, :]                   # (batch, hidden_dim*2)
        out = self.dropout(out)
        out = self.fc(out).squeeze(1)              # (batch,)
        return out

# 初始化模型、损失函数与优化器
VOCAB_SIZE = MAX_FEATURES + 3  # 0:padding, 1:start, 2:unknown (keras 自定义偏移)
model = BiLSTMClassifier(VOCAB_SIZE)
criterion = nn.BCEWithLogitsLoss()   # 包含 sigmoid 的二分类损失
optimizer = optim.Adam(model.parameters(), lr=0.001)

# -------------------- 4. 训练与评估 --------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

def train_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss, correct = 0, 0
    for data, labels in loader:
        data, labels = data.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(data)                   # 前向传播
        loss = criterion(outputs, labels)       # 计算损失
        loss.backward()                         # 反向传播
        optimizer.step()                        # 更新参数
        
        total_loss += loss.item()
        # 将 logits 通过 sigmoid 转换为概率,大于 0.5 判为正类
        preds = (torch.sigmoid(outputs) > 0.5).float()
        correct += (preds == labels).sum().item()
    return total_loss / len(loader), correct / len(loader.dataset)

def evaluate(model, loader, criterion):
    model.eval()
    total_loss, correct = 0, 0
    with torch.no_grad():
        for data, labels in loader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            preds = (torch.sigmoid(outputs) > 0.5).float()
            correct += (preds == labels).sum().item()
    return total_loss / len(loader), correct / len(loader.dataset)

EPOCHS = 5
for epoch in range(EPOCHS):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
    test_loss, test_acc = evaluate(model, test_loader, criterion)
    print(f'Epoch {epoch+1}/{EPOCHS} | Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} | Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
 案例二:LSTM 时间序列预测(正弦波预测)
任务:使用过去的正弦波值预测下一个时间步的值,演示 RNN 在回归预测中的应用。

python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

# -------------------- 1. 生成正弦波数据 --------------------
T = 1000                  # 总时间步
time = np.arange(0, T)
data = np.sin(0.02 * time)  # 正弦波,周期约 314 步

# 构造监督学习样本:用过去 50 个点预测下一个点
def create_sequences(data, seq_len):
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len])
    return np.array(X), np.array(y)

SEQ_LEN = 50
X_raw, y_raw = create_sequences(data, SEQ_LEN)

# 转换为 PyTorch 张量,并增加特征维度 (batch, seq_len, input_size=1)
X = torch.tensor(X_raw, dtype=torch.float32).unsqueeze(-1)
y = torch.tensor(y_raw, dtype=torch.float32).unsqueeze(-1)

# 划分训练集与测试集
split = int(0.8 * len(X))
X_train, y_train = X[:split], y[:split]
X_test, y_test = X[split:], y[split:]

# -------------------- 2. 定义 LSTM 回归模型 --------------------
class LSTMRegressor(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)   # 输出单个预测值
    
    def forward(self, x):
        # x: (batch, seq_len, input_size)
        lstm_out, _ = self.lstm(x)            # lstm_out: (batch, seq_len, hidden_size)
        # 取最后一个时间步的输出
        out = self.fc(lstm_out[:, -1, :])     # (batch, 1)
        return out

model = LSTMRegressor()
criterion = nn.MSELoss()        # 均方误差回归损失
optimizer = optim.Adam(model.parameters(), lr=0.001)

# -------------------- 3. 训练模型 --------------------
BATCH_SIZE = 32
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

EPOCHS = 30
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    if (epoch+1) % 5 == 0:
        print(f'Epoch {epoch+1}/{EPOCHS}, Loss: {total_loss/len(train_loader):.6f}')

# -------------------- 4. 预测与可视化 --------------------
model.eval()
with torch.no_grad():
    y_pred_train = model(X_train).squeeze().numpy()
    y_pred_test = model(X_test).squeeze().numpy()

plt.figure(figsize=(12,4))
plt.plot(np.arange(len(y_train)), y_train.numpy(), label='Train True')
plt.plot(np.arange(len(y_train)), y_pred_train, label='Train Pred')
plt.plot(np.arange(len(y_train), len(y_train)+len(y_test)), y_test.numpy(), label='Test True')
plt.plot(np.arange(len(y_train), len(y_train)+len(y_test)), y_pred_test, label='Test Pred')
plt.legend()
plt.title('LSTM Sine Wave Prediction')
plt.show()

迁移学习(Transfer Learning)

定义:将在大规模数据集(如 ImageNet)上预训练好的模型知识,迁移到小样本或相关任务中。

常用策略:

冻结骨干网络:只训练最后几层分类头,适合目标数据少且与源数据相似的情况。

微调(Fine‑tuning):解冻部分或全部预训练层,用较小学习率整体调整,适用数据较多、任务差异稍大时。

优势:降低训练成本,在小数据集上也能取得高性能。

典型场景:

计算机视觉:ResNet、EfficientNet 在 ImageNet 预训练后迁移到细粒度分类、目标检测。

自然语言处理:BERT、GPT、DistilBERT 等在大规模语料预训练后,微调用于情感分析、问答、文本分类。

案例一:ResNet18 微调 —— CIFAR‑10 图像分类
任务:使用在 ImageNet 预训练的 ResNet18,对 CIFAR‑10 进行 10 分类微调。

python
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets, models

# -------------------- 1. 数据准备:CIFAR-10 --------------------
# 针对 ImageNet 预训练模型的标准化参数
transform_train = transforms.Compose([
    transforms.Resize(224),                    # ResNet 输入需 224x224
    transforms.RandomHorizontalFlip(),         # 数据增强
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
transform_test = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False)

# -------------------- 2. 构建迁移学习模型 --------------------
model = models.resnet18(pretrained=True)       # 加载预训练权重

# 冻结所有卷积层参数(初始只训练分类头)
for param in model.parameters():
    param.requires_grad = False

# 替换最后的全连接层,输出类别数改为 10
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)             # 新层默认 requires_grad=True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# -------------------- 3. 训练(第一阶段:仅训练分类头)--------------------
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

def train_epoch(model, loader, optimizer, criterion):
    model.train()
    running_loss, correct = 0.0, 0
    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)                # 前向传播
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
    return running_loss / len(loader), correct / len(loader.dataset)

def evaluate(model, loader, criterion):
    model.eval()
    running_loss, correct = 0.0, 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
    return running_loss / len(loader), correct / len(loader.dataset)

print("Training classifier head...")
for epoch in range(5):
    train_loss, train_acc = train_epoch(model, trainloader, optimizer, criterion)
    test_loss, test_acc = evaluate(model, testloader, criterion)
    print(f'Epoch {epoch+1}: Train Loss {train_loss:.4f} Acc {train_acc:.4f} | Test Loss {test_loss:.4f} Acc {test_acc:.4f}')

# -------------------- (进阶)解冻全部参数,整体微调 --------------------
# 解冻所有层,设置较小学习率
for param in model.parameters():
    param.requires_grad = True
optimizer_full = optim.Adam(model.parameters(), lr=1e-4)  # 更小的学习率

print("\nFine-tuning all layers...")
for epoch in range(5):
    train_loss, train_acc = train_epoch(model, trainloader, optimizer_full, criterion)
    test_loss, test_acc = evaluate(model, testloader, criterion)
    print(f'Epoch {epoch+1}: Train Loss {train_loss:.4f} Acc {train_acc:.4f} | Test Loss {test_loss:.4f} Acc {test_acc:.4f}')




 案例二:DistilBERT 微调 —— IMDB 文本分类
任务:使用 Hugging Face 的预训练 DistilBERT 对 IMDB 进行情感二分类,展示 NLP 中的迁移学习。

python
# 需安装:pip install transformers datasets
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset
import torch
import numpy as np
from sklearn.metrics import accuracy_score

# -------------------- 1. 加载 IMDB 数据集(Hugging Face datasets)--------------------
dataset = load_dataset('imdb')
train_dataset = dataset['train'].shuffle(seed=42).select(range(5000))  # 取子集加快示例
test_dataset = dataset['test'].shuffle(seed=42).select(range(1000))

# -------------------- 2. 加载分词器与模型 --------------------
model_name = 'distilbert-base-uncased'
tokenizer = DistilBertTokenizer.from_pretrained(model_name)
model = DistilBertForSequenceClassification.from_pretrained(model_name, num_labels=2)

# -------------------- 3. 数据预处理:分词 --------------------
def tokenize_function(examples):
    # tokenizer 会将文本转换为 input_ids 和 attention_mask,并截断/填充到相同长度
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=256)

train_dataset = train_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)

# 设置数据格式(仅保留模型需要的字段)
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])

# -------------------- 4. 定义评估指标 --------------------
def compute_metrics(pred):
    labels = pred.label_ids
    preds = np.argmax(pred.predictions, axis=1)
    acc = accuracy_score(labels, preds)
    return {'accuracy': acc}

# -------------------- 5. 设置训练参数并启动训练 --------------------
training_args = TrainingArguments(
    output_dir='./results',          # 模型保存目录
    num_train_epochs=3,              # 训练轮数
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    evaluation_strategy='epoch',     # 每个 epoch 后评估
    save_strategy='epoch',
    logging_dir='./logs',
    learning_rate=2e-5,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
)

trainer.train()

# 最终评估
eval_result = trainer.evaluate()
print(f"Test Accuracy: {eval_result['eval_accuracy']:.4f}")

在线学习(Online Learning)

定义:模型随着新数据的到来实时更新参数,每处理一条或一小批数据就调整一次模型。

核心特点:

无需一次性加载全部数据,适合数据流和实时系统。

通常只遍历数据一遍(或少数几遍),即“单次扫描”训练。

代表算法:

随机梯度下降(SGD)的增量形式。

被动攻击算法(Passive‑Aggressive)。

FTRL(Follow‑the‑Regularized‑Leader)。

与批量学习的区别:

批量学习:在全量数据上反复迭代,需要大量内存和固定数据集。

在线学习:逐样本或逐小批次更新,可适应概念漂移。

典型应用:广告点击率预测、实时推荐系统、高频交易模型更新、物联网传感器数据分析。

案例一:在线二分类 — SGDClassifier 流式学习
使用 Scikit‑learn 的 SGDClassifier(支持 partial_fit)模拟在线学习,数据实时生成。

python
from sklearn.linear_model import SGDClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
import matplotlib.pyplot as plt

# -------------------- 1. 生成静态数据集(用于离线对比)--------------------
X, y = make_classification(n_samples=5000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 初始化在线学习分类器(loss='log' 逻辑回归;penalty='l2')
model = SGDClassifier(loss='log', penalty='l2', max_iter=1, tol=None, warm_start=True, random_state=42)

# -------------------- 2. 模拟数据流:逐批训练 --------------------
batch_size = 50
accs_online = []
# 遍历训练集多次(实际场景只遍历一遍,此处为了演示收敛效果)
n_epochs = 5
for epoch in range(n_epochs):
    # 每个 epoch 打乱数据顺序
    idx = np.random.permutation(len(X_train))
    X_shuffled, y_shuffled = X_train[idx], y_train[idx]
    for i in range(0, len(X_train), batch_size):
        X_batch = X_shuffled[i:i+batch_size]
        y_batch = y_shuffled[i:i+batch_size]
        model.partial_fit(X_batch, y_batch, classes=np.unique(y))  # 在线更新
        # 记录当前准确率(测试集)
        acc = accuracy_score(y_test, model.predict(X_test))
        accs_online.append(acc)

# -------------------- 3. 对比离线批量训练 --------------------
model_offline = SGDClassifier(loss='log', penalty='l2', max_iter=1000, random_state=42)
model_offline.fit(X_train, y_train)
offline_acc = accuracy_score(y_test, model_offline.predict(X_test))
print(f"Offline Batch Accuracy: {offline_acc:.4f}")
print(f"Online Final Accuracy: {accs_online[-1]:.4f}")

# 绘制在线准确率变化曲线
plt.figure(figsize=(8,4))
plt.plot(accs_online)
plt.axhline(y=offline_acc, color='r', linestyle='--', label='Offline accuracy')
plt.xlabel('Batch updates')
plt.ylabel('Test Accuracy')
plt.legend()
plt.title('Online Learning Progress')
plt.show()
🧪 案例二:在线线性回归 — SGDRegressor 逐样本更新
使用 SGDRegressor 实现一个简单的房价趋势在线学习模型,数据动态生成。

python
from sklearn.linear_model import SGDRegressor
from sklearn.datasets import make_regression
from sklearn.metrics import mean_squared_error
import numpy as np
import matplotlib.pyplot as plt

# -------------------- 1. 生成回归数据 --------------------
X, y = make_regression(n_samples=3000, n_features=10, noise=0.1, random_state=42)
# 划分训练集(流式数据)和测试集(固定)
split = int(0.8 * len(X))
X_stream, y_stream = X[:split], y[:split]
X_test, y_test = X[split:], y[split:]

# 初始化在线回归器
model = SGDRegressor(max_iter=1, tol=None, warm_start=True, learning_rate='constant', eta0=0.01)

# -------------------- 2. 逐样本在线学习(模拟实时数据)--------------------
mse_history = []
for i in range(len(X_stream)):
    # 取单个样本(现实场景中逐个到来)
    xi = X_stream[i].reshape(1, -1)
    yi = np.array([y_stream[i]])
    model.partial_fit(xi, yi)                # 在线更新
    # 每 50 步评估一次测试集
    if i % 50 == 0:
        pred = model.predict(X_test)
        mse = mean_squared_error(y_test, pred)
        mse_history.append(mse)

# -------------------- 3. 离线对比 --------------------
model_offline = SGDRegressor(max_iter=1000, tol=1e-3)
model_offline.fit(X_stream, y_stream)
offline_mse = mean_squared_error(y_test, model_offline.predict(X_test))
print(f"Offline MSE: {offline_mse:.4f}")
print(f"Online Final MSE: {mse_history[-1]:.4f}")

# 可视化在线学习过程中 MSE 的变化
plt.figure(figsize=(8,4))
plt.plot(range(0, len(X_stream), 50), mse_history)
plt.axhline(y=offline_mse, color='r', linestyle='--', label='Offline MSE')
plt.xlabel('Samples seen')
plt.ylabel('Test MSE')
plt.legend()
plt.title('Online Regression MSE over Time')
plt.show()

小结
RNN:掌握 LSTM/GRU 的门控机制,可灵活处理文本、时序等序列任务。

迁移学习:利用预训练大模型,通过冻结或微调快速解决小样本问题,广泛应用于图像和 NLP。

在线学习:支持数据流式增量更新,适合实时系统,Scikit‑learn 中的 partial_fit 提供了便捷的在线学习接口。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐