第五周-5.19
循环神经网络(RNN)
- 核心思想:处理序列数据,通过隐藏状态 (ht)(h_t)(ht) 在时间步之间传递信息。
- 标准 RNN 公式:(ht=tanh(Wxhxt+Whhht−1+bh)),(yt=Whyht+by)(h_t = \tanh(W_{xh}x_t + W_{hh}h_{t-1} + b_h)),(y_t = W_{hy}h_t + b_y)(ht=tanh(Wxhxt+Whhht−1+bh)),(yt=Whyht+by)。
- 痛点:普通 RNN 存在梯度消失/爆炸,难以捕捉长距离依赖。
- 改进变体:
- LSTM:遗忘门、输入门、输出门控制信息流动。
- GRU:更新门、重置门简化门控结构。
- 训练方式:沿时间反向传播(BPTT),常结合截断以降低计算量。
- 典型应用:文本分类、情感分析、语言模型、时间序列预测、机器翻译等。
案例一:LSTM 文本情感分类(IMDB 数据集)
任务:对电影评论进行二分类(正面/负面),使用词嵌入 + 双向 LSTM。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
import numpy as np
from keras.datasets import imdb
from keras.preprocessing.sequence import pad_sequences
MAX_FEATURES = 10000 # 只保留最常见的 10000 个单词
MAX_LEN = 500 # 统一序列长度
# 加载数据,num_words 限制词表大小
(x_train_raw, y_train), (x_test_raw, y_test) = imdb.load_data(num_words=MAX_FEATURES)
# 将序列裁剪或填充到相同长度(不足补0,过长截断)
x_train = pad_sequences(x_train_raw, maxlen=MAX_LEN, padding='post', truncating='post')
x_test = pad_sequences(x_test_raw, maxlen=MAX_LEN, padding='post', truncating='post')
# -------------------- 2. 构建 PyTorch 数据集 --------------------
class IMDBDataset(Dataset):
def __init__(self, data, labels):
# data: numpy 数组,shape (样本数, 序列长度)
# labels: numpy 数组,0/1 标签
self.data = torch.tensor(data, dtype=torch.long)
self.labels = torch.tensor(labels, dtype=torch.float32)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
# 实例化数据集
train_dataset = IMDBDataset(x_train, y_train)
test_dataset = IMDBDataset(x_test, y_test)
# 数据加载器
BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
# -------------------- 3. 定义模型:双向 LSTM 分类器 --------------------
class BiLSTMClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim=128, hidden_dim=128, num_layers=2, num_classes=1, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) # padding_idx 使填充位置梯度为0
self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers,
batch_first=True, bidirectional=True, dropout=dropout)
self.fc = nn.Linear(hidden_dim * 2, num_classes) # 双向拼接 -> 2*hidden_dim
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# x shape: (batch, seq_len)
embedded = self.embedding(x) # (batch, seq_len, embed_dim)
lstm_out, (hidden, cell) = self.lstm(embedded) # lstm_out: (batch, seq_len, hidden_dim*2)
# 取最后一个时间步的输出(也可使用最大池化)
out = lstm_out[:, -1, :] # (batch, hidden_dim*2)
out = self.dropout(out)
out = self.fc(out).squeeze(1) # (batch,)
return out
# 初始化模型、损失函数与优化器
VOCAB_SIZE = MAX_FEATURES + 3 # 0:padding, 1:start, 2:unknown (keras 自定义偏移)
model = BiLSTMClassifier(VOCAB_SIZE)
criterion = nn.BCEWithLogitsLoss() # 包含 sigmoid 的二分类损失
optimizer = optim.Adam(model.parameters(), lr=0.001)
# -------------------- 4. 训练与评估 --------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
def train_epoch(model, loader, criterion, optimizer):
model.train()
total_loss, correct = 0, 0
for data, labels in loader:
data, labels = data.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(data) # 前向传播
loss = criterion(outputs, labels) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
total_loss += loss.item()
# 将 logits 通过 sigmoid 转换为概率,大于 0.5 判为正类
preds = (torch.sigmoid(outputs) > 0.5).float()
correct += (preds == labels).sum().item()
return total_loss / len(loader), correct / len(loader.dataset)
def evaluate(model, loader, criterion):
model.eval()
total_loss, correct = 0, 0
with torch.no_grad():
for data, labels in loader:
data, labels = data.to(device), labels.to(device)
outputs = model(data)
loss = criterion(outputs, labels)
total_loss += loss.item()
preds = (torch.sigmoid(outputs) > 0.5).float()
correct += (preds == labels).sum().item()
return total_loss / len(loader), correct / len(loader.dataset)
EPOCHS = 5
for epoch in range(EPOCHS):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f'Epoch {epoch+1}/{EPOCHS} | Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} | Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
案例二:LSTM 时间序列预测(正弦波预测)
任务:使用过去的正弦波值预测下一个时间步的值,演示 RNN 在回归预测中的应用。
python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
# -------------------- 1. 生成正弦波数据 --------------------
T = 1000 # 总时间步
time = np.arange(0, T)
data = np.sin(0.02 * time) # 正弦波,周期约 314 步
# 构造监督学习样本:用过去 50 个点预测下一个点
def create_sequences(data, seq_len):
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i+seq_len])
y.append(data[i+seq_len])
return np.array(X), np.array(y)
SEQ_LEN = 50
X_raw, y_raw = create_sequences(data, SEQ_LEN)
# 转换为 PyTorch 张量,并增加特征维度 (batch, seq_len, input_size=1)
X = torch.tensor(X_raw, dtype=torch.float32).unsqueeze(-1)
y = torch.tensor(y_raw, dtype=torch.float32).unsqueeze(-1)
# 划分训练集与测试集
split = int(0.8 * len(X))
X_train, y_train = X[:split], y[:split]
X_test, y_test = X[split:], y[split:]
# -------------------- 2. 定义 LSTM 回归模型 --------------------
class LSTMRegressor(nn.Module):
def __init__(self, input_size=1, hidden_size=64, num_layers=2):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, 1) # 输出单个预测值
def forward(self, x):
# x: (batch, seq_len, input_size)
lstm_out, _ = self.lstm(x) # lstm_out: (batch, seq_len, hidden_size)
# 取最后一个时间步的输出
out = self.fc(lstm_out[:, -1, :]) # (batch, 1)
return out
model = LSTMRegressor()
criterion = nn.MSELoss() # 均方误差回归损失
optimizer = optim.Adam(model.parameters(), lr=0.001)
# -------------------- 3. 训练模型 --------------------
BATCH_SIZE = 32
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
EPOCHS = 30
for epoch in range(EPOCHS):
model.train()
total_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch+1) % 5 == 0:
print(f'Epoch {epoch+1}/{EPOCHS}, Loss: {total_loss/len(train_loader):.6f}')
# -------------------- 4. 预测与可视化 --------------------
model.eval()
with torch.no_grad():
y_pred_train = model(X_train).squeeze().numpy()
y_pred_test = model(X_test).squeeze().numpy()
plt.figure(figsize=(12,4))
plt.plot(np.arange(len(y_train)), y_train.numpy(), label='Train True')
plt.plot(np.arange(len(y_train)), y_pred_train, label='Train Pred')
plt.plot(np.arange(len(y_train), len(y_train)+len(y_test)), y_test.numpy(), label='Test True')
plt.plot(np.arange(len(y_train), len(y_train)+len(y_test)), y_pred_test, label='Test Pred')
plt.legend()
plt.title('LSTM Sine Wave Prediction')
plt.show()
迁移学习(Transfer Learning)
定义:将在大规模数据集(如 ImageNet)上预训练好的模型知识,迁移到小样本或相关任务中。
常用策略:
冻结骨干网络:只训练最后几层分类头,适合目标数据少且与源数据相似的情况。
微调(Fine‑tuning):解冻部分或全部预训练层,用较小学习率整体调整,适用数据较多、任务差异稍大时。
优势:降低训练成本,在小数据集上也能取得高性能。
典型场景:
计算机视觉:ResNet、EfficientNet 在 ImageNet 预训练后迁移到细粒度分类、目标检测。
自然语言处理:BERT、GPT、DistilBERT 等在大规模语料预训练后,微调用于情感分析、问答、文本分类。
案例一:ResNet18 微调 —— CIFAR‑10 图像分类
任务:使用在 ImageNet 预训练的 ResNet18,对 CIFAR‑10 进行 10 分类微调。
python
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets, models
# -------------------- 1. 数据准备:CIFAR-10 --------------------
# 针对 ImageNet 预训练模型的标准化参数
transform_train = transforms.Compose([
transforms.Resize(224), # ResNet 输入需 224x224
transforms.RandomHorizontalFlip(), # 数据增强
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
transform_test = transforms.Compose([
transforms.Resize(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False)
# -------------------- 2. 构建迁移学习模型 --------------------
model = models.resnet18(pretrained=True) # 加载预训练权重
# 冻结所有卷积层参数(初始只训练分类头)
for param in model.parameters():
param.requires_grad = False
# 替换最后的全连接层,输出类别数改为 10
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10) # 新层默认 requires_grad=True
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# -------------------- 3. 训练(第一阶段:仅训练分类头)--------------------
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
def train_epoch(model, loader, optimizer, criterion):
model.train()
running_loss, correct = 0.0, 0
for inputs, labels in loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs) # 前向传播
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, preds = torch.max(outputs, 1)
correct += (preds == labels).sum().item()
return running_loss / len(loader), correct / len(loader.dataset)
def evaluate(model, loader, criterion):
model.eval()
running_loss, correct = 0.0, 0
with torch.no_grad():
for inputs, labels in loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
running_loss += loss.item()
_, preds = torch.max(outputs, 1)
correct += (preds == labels).sum().item()
return running_loss / len(loader), correct / len(loader.dataset)
print("Training classifier head...")
for epoch in range(5):
train_loss, train_acc = train_epoch(model, trainloader, optimizer, criterion)
test_loss, test_acc = evaluate(model, testloader, criterion)
print(f'Epoch {epoch+1}: Train Loss {train_loss:.4f} Acc {train_acc:.4f} | Test Loss {test_loss:.4f} Acc {test_acc:.4f}')
# -------------------- (进阶)解冻全部参数,整体微调 --------------------
# 解冻所有层,设置较小学习率
for param in model.parameters():
param.requires_grad = True
optimizer_full = optim.Adam(model.parameters(), lr=1e-4) # 更小的学习率
print("\nFine-tuning all layers...")
for epoch in range(5):
train_loss, train_acc = train_epoch(model, trainloader, optimizer_full, criterion)
test_loss, test_acc = evaluate(model, testloader, criterion)
print(f'Epoch {epoch+1}: Train Loss {train_loss:.4f} Acc {train_acc:.4f} | Test Loss {test_loss:.4f} Acc {test_acc:.4f}')
案例二:DistilBERT 微调 —— IMDB 文本分类
任务:使用 Hugging Face 的预训练 DistilBERT 对 IMDB 进行情感二分类,展示 NLP 中的迁移学习。
python
# 需安装:pip install transformers datasets
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset
import torch
import numpy as np
from sklearn.metrics import accuracy_score
# -------------------- 1. 加载 IMDB 数据集(Hugging Face datasets)--------------------
dataset = load_dataset('imdb')
train_dataset = dataset['train'].shuffle(seed=42).select(range(5000)) # 取子集加快示例
test_dataset = dataset['test'].shuffle(seed=42).select(range(1000))
# -------------------- 2. 加载分词器与模型 --------------------
model_name = 'distilbert-base-uncased'
tokenizer = DistilBertTokenizer.from_pretrained(model_name)
model = DistilBertForSequenceClassification.from_pretrained(model_name, num_labels=2)
# -------------------- 3. 数据预处理:分词 --------------------
def tokenize_function(examples):
# tokenizer 会将文本转换为 input_ids 和 attention_mask,并截断/填充到相同长度
return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=256)
train_dataset = train_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)
# 设置数据格式(仅保留模型需要的字段)
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label'])
# -------------------- 4. 定义评估指标 --------------------
def compute_metrics(pred):
labels = pred.label_ids
preds = np.argmax(pred.predictions, axis=1)
acc = accuracy_score(labels, preds)
return {'accuracy': acc}
# -------------------- 5. 设置训练参数并启动训练 --------------------
training_args = TrainingArguments(
output_dir='./results', # 模型保存目录
num_train_epochs=3, # 训练轮数
per_device_train_batch_size=32,
per_device_eval_batch_size=32,
evaluation_strategy='epoch', # 每个 epoch 后评估
save_strategy='epoch',
logging_dir='./logs',
learning_rate=2e-5,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=test_dataset,
compute_metrics=compute_metrics,
)
trainer.train()
# 最终评估
eval_result = trainer.evaluate()
print(f"Test Accuracy: {eval_result['eval_accuracy']:.4f}")
在线学习(Online Learning)
定义:模型随着新数据的到来实时更新参数,每处理一条或一小批数据就调整一次模型。
核心特点:
无需一次性加载全部数据,适合数据流和实时系统。
通常只遍历数据一遍(或少数几遍),即“单次扫描”训练。
代表算法:
随机梯度下降(SGD)的增量形式。
被动攻击算法(Passive‑Aggressive)。
FTRL(Follow‑the‑Regularized‑Leader)。
与批量学习的区别:
批量学习:在全量数据上反复迭代,需要大量内存和固定数据集。
在线学习:逐样本或逐小批次更新,可适应概念漂移。
典型应用:广告点击率预测、实时推荐系统、高频交易模型更新、物联网传感器数据分析。
案例一:在线二分类 — SGDClassifier 流式学习
使用 Scikit‑learn 的 SGDClassifier(支持 partial_fit)模拟在线学习,数据实时生成。
python
from sklearn.linear_model import SGDClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import numpy as np
import matplotlib.pyplot as plt
# -------------------- 1. 生成静态数据集(用于离线对比)--------------------
X, y = make_classification(n_samples=5000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 初始化在线学习分类器(loss='log' 逻辑回归;penalty='l2')
model = SGDClassifier(loss='log', penalty='l2', max_iter=1, tol=None, warm_start=True, random_state=42)
# -------------------- 2. 模拟数据流:逐批训练 --------------------
batch_size = 50
accs_online = []
# 遍历训练集多次(实际场景只遍历一遍,此处为了演示收敛效果)
n_epochs = 5
for epoch in range(n_epochs):
# 每个 epoch 打乱数据顺序
idx = np.random.permutation(len(X_train))
X_shuffled, y_shuffled = X_train[idx], y_train[idx]
for i in range(0, len(X_train), batch_size):
X_batch = X_shuffled[i:i+batch_size]
y_batch = y_shuffled[i:i+batch_size]
model.partial_fit(X_batch, y_batch, classes=np.unique(y)) # 在线更新
# 记录当前准确率(测试集)
acc = accuracy_score(y_test, model.predict(X_test))
accs_online.append(acc)
# -------------------- 3. 对比离线批量训练 --------------------
model_offline = SGDClassifier(loss='log', penalty='l2', max_iter=1000, random_state=42)
model_offline.fit(X_train, y_train)
offline_acc = accuracy_score(y_test, model_offline.predict(X_test))
print(f"Offline Batch Accuracy: {offline_acc:.4f}")
print(f"Online Final Accuracy: {accs_online[-1]:.4f}")
# 绘制在线准确率变化曲线
plt.figure(figsize=(8,4))
plt.plot(accs_online)
plt.axhline(y=offline_acc, color='r', linestyle='--', label='Offline accuracy')
plt.xlabel('Batch updates')
plt.ylabel('Test Accuracy')
plt.legend()
plt.title('Online Learning Progress')
plt.show()
🧪 案例二:在线线性回归 — SGDRegressor 逐样本更新
使用 SGDRegressor 实现一个简单的房价趋势在线学习模型,数据动态生成。
python
from sklearn.linear_model import SGDRegressor
from sklearn.datasets import make_regression
from sklearn.metrics import mean_squared_error
import numpy as np
import matplotlib.pyplot as plt
# -------------------- 1. 生成回归数据 --------------------
X, y = make_regression(n_samples=3000, n_features=10, noise=0.1, random_state=42)
# 划分训练集(流式数据)和测试集(固定)
split = int(0.8 * len(X))
X_stream, y_stream = X[:split], y[:split]
X_test, y_test = X[split:], y[split:]
# 初始化在线回归器
model = SGDRegressor(max_iter=1, tol=None, warm_start=True, learning_rate='constant', eta0=0.01)
# -------------------- 2. 逐样本在线学习(模拟实时数据)--------------------
mse_history = []
for i in range(len(X_stream)):
# 取单个样本(现实场景中逐个到来)
xi = X_stream[i].reshape(1, -1)
yi = np.array([y_stream[i]])
model.partial_fit(xi, yi) # 在线更新
# 每 50 步评估一次测试集
if i % 50 == 0:
pred = model.predict(X_test)
mse = mean_squared_error(y_test, pred)
mse_history.append(mse)
# -------------------- 3. 离线对比 --------------------
model_offline = SGDRegressor(max_iter=1000, tol=1e-3)
model_offline.fit(X_stream, y_stream)
offline_mse = mean_squared_error(y_test, model_offline.predict(X_test))
print(f"Offline MSE: {offline_mse:.4f}")
print(f"Online Final MSE: {mse_history[-1]:.4f}")
# 可视化在线学习过程中 MSE 的变化
plt.figure(figsize=(8,4))
plt.plot(range(0, len(X_stream), 50), mse_history)
plt.axhline(y=offline_mse, color='r', linestyle='--', label='Offline MSE')
plt.xlabel('Samples seen')
plt.ylabel('Test MSE')
plt.legend()
plt.title('Online Regression MSE over Time')
plt.show()
小结
RNN:掌握 LSTM/GRU 的门控机制,可灵活处理文本、时序等序列任务。
迁移学习:利用预训练大模型,通过冻结或微调快速解决小样本问题,广泛应用于图像和 NLP。
在线学习:支持数据流式增量更新,适合实时系统,Scikit‑learn 中的 partial_fit 提供了便捷的在线学习接口。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)