基于CNN和MLP的姓氏分类
引言
在当今的数据驱动世界中,利用机器学习模型进行文本分类变得越来越普遍。本文将介绍如何使用卷积神经网络(CNN)和多层感知机(MLP)来实现姓氏分类,预测给定姓氏的国籍。下面将详细分析CNN和MLP的结构,阐述姓氏分类的整个流程,涵盖从数据准备、模型定义、模型训练到模型评估和预测的各个步骤。
一、卷积神经网络CNN
1.1基本结构
如图所示,卷积神经网络(Convolutional Neural Network, CNN)主要由卷积层、池化层(图中的pooling)、全连接层和输出层(图中的softmax)组成。CNN是一种专门用于处理具有类似网格结构数据的深度神经网络,广泛应用于图像和视频识别、推荐系统和自然语言处理等领域。它们通过卷积操作有效地捕捉数据中的空间和时间依赖性。
关键概念
-
卷积核(Filter/Kernal):小尺寸的权重矩阵,沿输入数据滑动进行点积操作,提取局部特征。
-
填充(Padding):在输入数据的边缘填充零,以控制输出特征图的尺寸,常见的有零填充(zero-padding)。
-
步幅(Stride):卷积核在输入数据上滑动的步长,步幅越大,输出特征图的尺寸越小。
-
特征图(Feature Map):经过卷积操作后得到的输出,反映了输入数据的特征。
以下是CNN各个组成的详细介绍。
1.2卷积层(Convolutional Layer):
通过卷积核在输入数据上进行二维卷积,提取局部特征。每个卷积核都会在输入数据上滑动,即进行卷积运算生成特征图(feature map)。
常用的激活函数是ReLU(Rectified Linear Unit)、Sigmoid和Tanh等。最常用的是ReLu,它能增加网络的非线性表达能力。
- ReLU(Rectified Linear Unit):
,广泛使用,能够加速训练和减少梯度消失问题。
- Sigmoid:
,将输入映射到(0, 1)之间。
- Tanh:
,将输入映射到(-1, 1)之间。
1.3池化层(Pooling Layer):
常见的池化方法由最大池化和平均池化两种:
最大池化(Max Pooling):取窗口内的最大值,减少数据量的同时保留重要特征。
平均池化(Average Pooling):取窗口内的平均值,平滑特征。
1.4全连接层(Fully Connected Layer):
将前面的特征图展平成一维向量,然后通过全连接层进行分类或回归。
1.5输出层(Output Layer):
由上述结构得到的结果分别并不均匀,可正可负,也有可能相差很大。对于分类任务,输出层通常使用Softmax函数将结果转换为0至1的概率分布。即对于输入,可以得到
,其中
优点
- 参数共享:卷积核在输入数据上滑动,相同的参数应用于不同位置,大大减少了参数数量。
- 稀疏连接:每个卷积核只与局部区域连接,减少了计算复杂度。
- 平移不变性:卷积操作可以捕捉图像的平移不变特征。
二、多层感知机MLP
2.1基本结构
多层感知机(Multilayer Perceptron, MLP)是一种前馈神经网络,通常用于分类和回归任务。它由多个全连接层组成,每一层的神经元与下一层的每一个神经元都有连接。以下是关于MLP的详细介绍:
-
输入层(Input Layer):
直接接收原始数据,每个输入对应一个节点。 -
隐藏层(Hidden Layers):
- 由一个或多个层组成,位于输入层和输出层之间。每个隐藏层由若干节点组成,每个节点都与前一层的所有节点相连。
- 激活函数(Activation Function):隐藏层的节点通常使用非线性激活函数,如ReLU、Sigmoid或Tanh,以增加网络的表达能力。
-
输出层(Output Layer):
产生最终的预测结果。对于分类任务,输出层的节点数等于类别数,常用Softmax函数将输出转换为概率分布。对于回归任务,输出层通常只有一个节点,直接输出预测值。
2.2关键概念
-
激活函数(Activation Function):
- ReLU(Rectified Linear Unit):
,广泛使用,能够加速训练和减少梯度消失问题。
- Sigmoid:
,将输入映射到(0, 1)之间。
- Tanh:
,将输入映射到(-1, 1)之间。
- ReLU(Rectified Linear Unit):
-
损失函数(Loss Function):
衡量预测结果与真实值之间的差异。常用的损失函数有交叉熵损失(用于分类)和均方误差(用于回归)。 -
反向传播(Backpropagation):
一种用于训练神经网络的算法,通过计算损失函数的梯度来调整网络中的权重和偏置,从而最小化损失。 -
权重初始化(Weight Initialization):
选择适当的初始权重对网络的训练速度和效果有重要影响。常用的方法有Xavier初始化和He初始化。
优点
- 强大的表达能力:可以近似任何复杂的函数。
- 通用性:适用于各种类型的数据,包括图像、文本和时间序列等。
缺点
- 计算复杂度高:尤其是在隐藏层节点数较多时,训练时间较长。
- 容易过拟合:需要正则化技术如L2正则化或Dropout来防止过拟合。
三、MLP实现姓氏分类
3.1环境搭建
确保实验过程的可视化和代码模块化,本实验在Visual Studio Code中的jupyter notebook环境下进行。
导入后续代码中所需要的库:
from argparse import Namespace
from collections import Counter
import json
import os
import string
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm_notebook
简单说一下我的版本:
torch==2.3.1
torchvision==0.18.0
tqdm==4.65.0
3.2数据预处理
定义一个Vocabulary类。这个类用于处理文本数据并提取词汇表以进行映射。该类具有添加单词、查找单词索引等功能。__init__是初始化方法,可以传入一个预先存在的token_to_idx映射,也可以选择UNK标记。
class Vocabulary(object):
"""处理文本并提取词汇进行映射的类"""
def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
"""
Args:
token_to_idx (dict): 预先存在的token到索引的映射
add_unk (bool): 指示是否添加UNK token的标志
unk_token (str): 要添加到词汇表中的UNK token
"""
if token_to_idx is None:
token_to_idx = {}
self._token_to_idx = token_to_idx
# 创建从索引到token的映射
self._idx_to_token = {idx: token
for token, idx in self._token_to_idx.items()}
self._add_unk = add_unk
self._unk_token = unk_token
self.unk_index = -1
if add_unk:
self.unk_index = self.add_token(unk_token)
def to_serializable(self):
"""返回一个可以序列化的字典"""
return {'token_to_idx': self._token_to_idx,
'add_unk': self._add_unk,
'unk_token': self._unk_token}
@classmethod
def from_serializable(cls, contents):
""" 从序列化的字典实例化词汇表 """
return cls(**contents)
def add_token(self, token):
"""根据token更新映射字典.
Args:
token (str): 要添加到词汇表中的项目
Returns:
index (int): 对应于token的整数索引
"""
try:
index = self._token_to_idx[token]
except KeyError:
index = len(self._token_to_idx)
self._token_to_idx[token] = index
self._idx_to_token[index] = token
return index
def add_many(self, tokens):
"""将一组token添加到词汇表中
Args:
tokens (list): 字符串token的列表
Returns:
indices (list): 对应于这些token的索引列表
"""
return [self.add_token(token) for token in tokens]
def lookup_token(self, token):
"""
检索与token相关联的索引
或者如果token不存在,则返回UNK索引
Args:
token (str): 要查找的token
Returns:
index (int): 对应于token的索引
Notes:
`unk_index` 需要 >=0 (已添加到词汇表中)
才能使用UNK功能
"""
if self.unk_index >= 0:
return self._token_to_idx.get(token, self.unk_index)
else:
return self._token_to_idx[token]
def lookup_index(self, index):
"""返回与索引相关联的token
Args:
index (int): 要查找的索引
Returns:
token (str): 对应于该索引的token
Raises:
KeyError: 如果索引不在词汇表中
"""
if index not in self._idx_to_token:
raise KeyError("the index (%d) is not in the Vocabulary" % index)
return self._idx_to_token[index]
def __str__(self):
return "<Vocabulary(size=%d)>" % len(self)
def __len__(self):
return len(self._token_to_idx)
定义一个SurnameVectorizer类。这个类是一个向量化器。该类协调使用Vocabularies并进行向量化处理。它将姓氏和国籍映射为整数,并生成一个独热编码矩阵。首先需要传入一个姓氏的Vocabulary对象、一个国籍的Vocabulary对象和最长姓氏的长度。再将输入的姓氏字符串转换为一个one-hot编码的矩阵。
class SurnameVectorizer(object):
""" 该向量化器协调词汇并将其投入使用"""
def __init__(self, surname_vocab, nationality_vocab):
"""
Args:
surname_vocab (Vocabulary): 将字符映射到整数的词汇表
nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
"""
self.surname_vocab = surname_vocab
self.nationality_vocab = nationality_vocab
def vectorize(self, surname):
"""
Args:
surname (str): 姓氏
Returns:
one_hot_matrix (np.ndarray): 一个one-hot向量矩阵
"""
vocab = self.surname_vocab
one_hot = np.zeros(len(vocab), dtype=np.float32)
# 将姓氏中的每一个字符转换为对应的one-hot编码
for token in surname:
one_hot[vocab.lookup_token(token)] = 1
return one_hot
@classmethod
def from_dataframe(cls, surname_df):
"""从数据集的数据框实例化向量化器
Args:
surname_df (pandas.DataFrame): 姓氏数据集
Returns:
SurnameVectorizer的一个实例
"""
surname_vocab = Vocabulary(unk_token="@")
nationality_vocab = Vocabulary(add_unk=False)
# 遍历数据框的每一行,将姓氏中的每个字符和国籍添加到词汇表中
for index, row in surname_df.iterrows():
for letter in row.surname:
surname_vocab.add_token(letter)
nationality_vocab.add_token(row.nationality)
return cls(surname_vocab, nationality_vocab)
@classmethod
def from_serializable(cls, contents):
"""从可序列化的字典中实例化向量化器"""
surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
def to_serializable(self):
"""返回一个可序列化的字典"""
return {'surname_vocab': self.surname_vocab.to_serializable(),
'nationality_vocab': self.nationality_vocab.to_serializable()}
为了创建最终的数据集,从一个比课程补充材料中包含的版本处理更少的版本开始,并执行了几个数据集修改操作。第一个目的是减少这种不平衡——原始数据集中70%以上是俄文,这可能是由于抽样偏差或俄文姓氏的增多。为此,我们通过选择标记为俄语的姓氏的随机子集对这个过度代表的类进行子样本。接下来,我们根据国籍对数据集进行分组,并将数据集分为三个部分:70%到训练数据集,15%到验证数据集,最后15%到测试数据集,以便跨这些部分的类标签分布具有可比性。
class SurnameDataset(Dataset):
def __init__(self, surname_df, vectorizer):
"""
Args:
surname_df (pandas.DataFrame): 数据集
vectorizer (SurnameVectorizer): 从数据集实例化的向量化器
"""
self.surname_df = surname_df
self._vectorizer = vectorizer
# 划分训练集、验证集和测试集,并记录每个子集的大小
self.train_df = self.surname_df[self.surname_df.split=='train']
self.train_size = len(self.train_df)
self.val_df = self.surname_df[self.surname_df.split=='val']
self.validation_size = len(self.val_df)
self.test_df = self.surname_df[self.surname_df.split=='test']
self.test_size = len(self.test_df)
self._lookup_dict = {'train': (self.train_df, self.train_size),
'val': (self.val_df, self.validation_size),
'test': (self.test_df, self.test_size)}
self.set_split('train')
# 类别权重,用于处理类别不平衡问题
class_counts = surname_df.nationality.value_counts().to_dict()
def sort_key(item):
return self._vectorizer.nationality_vocab.lookup_token(item[0])
sorted_counts = sorted(class_counts.items(), key=sort_key)
frequencies = [count for _, count in sorted_counts]
self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
@classmethod
def load_dataset_and_make_vectorizer(cls, surname_csv):
"""加载数据集并从头创建一个新的向量化器
Args:
surname_csv (str): 数据集的位置
Returns:
SurnameDataset的一个实例
"""
surname_df = pd.read_csv(surname_csv)
train_surname_df = surname_df[surname_df.split=='train']
return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
@classmethod
def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
"""加载数据集和相应的向量化器
在向量化器已缓存以供重用时使用
Args:
surname_csv (str): 数据集的位置
vectorizer_filepath (str): 已保存的向量化器的位置
Returns:
SurnameDataset的一个实例
"""
surname_df = pd.read_csv(surname_csv)
vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
return cls(surname_df, vectorizer)
@staticmethod
def load_vectorizer_only(vectorizer_filepath):
"""从文件中加载向量化器的静态方法
Args:
vectorizer_filepath (str):序列化的向量化器的位置
Returns:
SurnameVectorizer的一个实例
"""
with open(vectorizer_filepath) as fp:
return SurnameVectorizer.from_serializable(json.load(fp))
def save_vectorizer(self, vectorizer_filepath):
"""使用json将向量化器保存到磁盘
Args:
vectorizer_filepath (str):保存向量化器的位置
"""
with open(vectorizer_filepath, "w") as fp:
json.dump(self._vectorizer.to_serializable(), fp)
def get_vectorizer(self):
""" 返回向量化器 """
return self._vectorizer
def set_split(self, split="train"):
""" 使用数据框中的列选择数据集的划分 """
self._target_split = split
self._target_df, self._target_size = self._lookup_dict[split]
def __len__(self):
return self._target_size
def __getitem__(self, index):
"""pytorch数据集的主要入口点方法
Args:
index (int): 数据点的索引
Returns:
一个包含数据点的字典:
features (x_surname)
label (y_nationality)
"""
row = self._target_df.iloc[index]
surname_vector = \
self._vectorizer.vectorize(row.surname)
nationality_index = \
self._vectorizer.nationality_vocab.lookup_token(row.nationality)
return {'x_surname': surname_vector,
'y_nationality': nationality_index}
def get_num_batches(self, batch_size):
"""给定批量大小,返回数据集张的批次数量
Args:
batch_size (int)
Returns:
数据集中的批次数量
"""
return len(self) // batch_size
def generate_batches(dataset, batch_size, shuffle=True,
drop_last=True, device="cpu"):
"""
一个包装pytorch DataLoader的生成器函数
它将确保每个张量在正确的设备位置上。
"""
dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
shuffle=shuffle, drop_last=drop_last)
for data_dict in dataloader:
out_data_dict = {}
for name, tensor in data_dict.items():
out_data_dict[name] = data_dict[name].to(device)
yield out_data_dict
3.3MLP模型构建
为姓氏分类任务构建多层感知器模型,由两个线性层构成,其中穿插着非线性激活函数,以增强模型对复杂特征的捕捉能力。第一个线性层负责将输入的姓氏向量映射到一个中间特征空间,而第二个线性层则将这些特征转换为最终的预测向量,表示各个国籍的类别得分。在输出层,我们采用softmax函数来转换得分为概率分布。
class SurnameClassifier(nn.Module):
"""用于姓氏分类的两层多层感知机(MLP)"""
def __init__(self, input_dim, hidden_dim, output_dim):
"""
Args:
input_dim (int): 输入向量的大小
hidden_dim (int): 第一层线性层的输出大小
output_dim (int): 第二层线性层的输出大小
"""
super(SurnameClassifier, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x_in, apply_softmax=False):
"""分类器的前向传播
Args:
x_in (torch.Tensor): 输入数据张量.
x_in.shape 应该为 (batch, input_dim)
apply_softmax (bool): 是否使用softmax激活函数的标志
如果与交叉熵损失一起使用,则应为False
Returns:
结果张量r. tensor.shape应该为(batch, output_dim)
"""
# 通过第一个全连接层并应用Relu激活
intermediate_vector = F.relu(self.fc1(x_in))
# 通过第二个全连接层
prediction_vector = self.fc2(intermediate_vector)
# 如果需要,应用softmax激活
if apply_softmax:
prediction_vector = F.softmax(prediction_vector, dim=1)
return prediction_vector
3.4训练模型
为了更好展示训练过程,这里定义了如下函数:
- make_train_state:用于创建一个保存训练状态的字典。
- update_train_state:该函数用于处理训练状态的更新。函数的作用包括早期停止,为了防止过拟合,如果验证集的损失值在连续的若干个epoch中没有改善,就停止训练。也包括模型检查点,如果模型的性能提高了,则保存模型。
- compute_accuracy:用于计算模型的准确率。函数的输入参数包括预测值y_pred和目标值y_target。
def make_train_state(args):
return {'stop_early': False,
'early_stopping_step': 0,
'early_stopping_best_val': 1e8,
'learning_rate': args.learning_rate,
'epoch_index': 0,
'train_loss': [],
'train_acc': [],
'val_loss': [],
'val_acc': [],
'test_loss': -1,
'test_acc': -1,
'model_filename': args.model_state_file}
def update_train_state(args, model, train_state):
"""处理训练状态更新。
组件:
- 提前停止: 防止过拟合。
- 模型检查点: 如果模型表现更好,则保存模型。
:param args: 主参数
:param model: 训练的模型
:param train_state: 表示训练状态值的字典
:returns:
更新后的train_state
"""
# 至少保存一个模型
if train_state['epoch_index'] == 0:
torch.save(model.state_dict(), train_state['model_filename'])
train_state['stop_early'] = False
# 如果性能提升则保存模型
elif train_state['epoch_index'] >= 1:
loss_tm1, loss_t = train_state['val_loss'][-2:]
# 如果损失变差
if loss_t >= train_state['early_stopping_best_val']:
# 更新步数
train_state['early_stopping_step'] += 1
# 损失减少
else:
# 保存最佳模型
if loss_t < train_state['early_stopping_best_val']:
torch.save(model.state_dict(), train_state['model_filename'])
# 重置提前停止步数
train_state['early_stopping_step'] = 0
# 是否提前停止
train_state['stop_early'] = \
train_state['early_stopping_step'] >= args.early_stopping_criteria
return train_state
def compute_accuracy(y_pred, y_target):
_, y_pred_indices = y_pred.max(dim=1)# 获取最大预测值的索引
n_correct = torch.eq(y_pred_indices, y_target).sum().item() # 计算正确预测的数量
return n_correct / len(y_pred_indices) * 100 # 返回准确率的百分比
两个通用的工具函数:
def set_seed_everywhere(seed, cuda):
# 设置NumPy的随机种子
np.random.seed(seed)
# 设置PyTorch的随机种子
torch.manual_seed(seed)
# 如果使用CUDA,则为所有CUDA设备设置随机种子
if cuda:
torch.cuda.manual_seed_all(seed)
def handle_dirs(dirpath):
# 如果目录路径不存在,则创建该目录
if not os.path.exists(dirpath):
os.makedirs(dirpath)
args对象,其中包含了各种数据和路径信息、模型超参数、训练超参数和运行时选项。hidden_dim为隐藏层的维度。num_channels为卷积层的输出通道数。learning_rate为学习率。batch_size为批大小。num_epochs为训练的总epoch数。dropout_p为Dropout层的概率。
args = Namespace(
# 数据和路径信息
surname_csv="data/surnames/surnames_with_splits.csv", # 姓氏数据集的CSV文件路径
vectorizer_file="vectorizer.json", # 向量化器的文件路径
model_state_file="model.pth", # 模型状态文件路径
save_dir="model_storage/ch4/surname_mlp", # 保存目录路径
# 模型超参数
hidden_dim=300, # 隐藏层维度
# 训练超参数
seed=1337, # 随机种子
num_epochs=100, # 训练轮数
early_stopping_criteria=5, # 提前停止的标准
learning_rate=0.001, # 学习率
batch_size=64, # 批大小
# 运行时选项
cuda=False, # 是否使用CUDA
reload_from_files=False, # 是否从文件重新加载
expand_filepaths_to_save_dir=True, # 是否扩展文件路径到保存目录
)
# 如果需要扩展文件路径到保存目录
if args.expand_filepaths_to_save_dir:
args.vectorizer_file = os.path.join(args.save_dir,
args.vectorizer_file) # 扩展向量化器文件路径
args.model_state_file = os.path.join(args.save_dir,
args.model_state_file) # 扩展模型状态文件路径
print("Expanded filepaths: ")
print("\t{}".format(args.vectorizer_file)) # 打印扩展后的向量化器文件路径
print("\t{}".format(args.model_state_file)) # 打印扩展后的模型状态文件路径
# 检查是否可用CUDA
if not torch.cuda.is_available():
args.cuda = False # 如果不可用,则设置为False
args.device = torch.device("cuda" if args.cuda else "cpu") # 根据CUDA的可用性设置设备
print("Using CUDA: {}".format(args.cuda)) # 打印是否使用CUDA
# 设置随机种子以保证可复现性
set_seed_everywhere(args.seed, args.cuda)
# 处理保存目录
handle_dirs(args.save_dir)
开始训练模型:
if args.reload_from_files:
# 从检查点重新加载训练
print("Reloading!")
dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
args.vectorizer_file) # 加载数据集和向量化器
else:
# 创建新的数据集和向量化器
print("Creating fresh!")
dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv) # 创建数据集和向量化器
dataset.save_vectorizer(args.vectorizer_file) # 保存向量化器
vectorizer = dataset.get_vectorizer() # 获取向量化器
classifier = SurnameClassifier(input_dim=len(vectorizer.surname_vocab),
hidden_dim=args.hidden_dim,
output_dim=len(vectorizer.nationality_vocab)) # 创建姓氏分类器
classifier = classifier.to(args.device) # 将分类器移动到指定设备(CPU或GPU)
dataset.class_weights = dataset.class_weights.to(args.device) # 将数据集的类别权重移动到指定设备
# 定义损失函数,使用类别权重进行加权交叉熵损失
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
# 定义优化器,使用Adam优化算法
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
# 定义学习率调度器,当验证损失不再降低时减少学习率
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
mode='min', factor=0.5,
patience=1)
# 初始化训练状态
train_state = make_train_state(args)
# 创建进度条
epoch_bar = tqdm_notebook(desc='training routine',
total=args.num_epochs,
position=0)
# 设置数据集分割为训练集并创建训练进度条
dataset.set_split('train')
train_bar = tqdm_notebook(desc='split=train',
total=dataset.get_num_batches(args.batch_size),
position=1,
leave=True)
# 设置数据集分割为验证集并创建验证进度条
dataset.set_split('val')
val_bar = tqdm_notebook(desc='split=val',
total=dataset.get_num_batches(args.batch_size),
position=1,
leave=True)
try:
for epoch_index in range(args.num_epochs):
train_state['epoch_index'] = epoch_index
# 迭代训练数据集
# 设置:批生成器,初始化损失和准确率为0,开启训练模式
dataset.set_split('train')
batch_generator = generate_batches(dataset,
batch_size=args.batch_size,
device=args.device)
running_loss = 0.0
running_acc = 0.0
classifier.train() # 设置模型为训练模式
for batch_index, batch_dict in enumerate(batch_generator):
# 训练过程包含以下五步:
# --------------------------------------
# 第一步:清零梯度
optimizer.zero_grad()
# 第二步:计算输出
y_pred = classifier(batch_dict['x_surname'])
# 第三步:计算损失
loss = loss_func(y_pred, batch_dict['y_nationality'])
loss_t = loss.item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 第四步:使用损失计算梯度
loss.backward()
# 第五步:使用优化器进行梯度更新
optimizer.step()
# -----------------------------------------
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
running_acc += (acc_t - running_acc) / (batch_index + 1)
# 更新进度条
train_bar.set_postfix(loss=running_loss, acc=running_acc,
epoch=epoch_index)
train_bar.update()
train_state['train_loss'].append(running_loss)
train_state['train_acc'].append(running_acc)
# 迭代验证数据集
# 设置:批生成器,初始化损失和准确率为0,开启评估模式
dataset.set_split('val')
batch_generator = generate_batches(dataset,
batch_size=args.batch_size,
device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval() # 设置模型为评估模式
for batch_index, batch_dict in enumerate(batch_generator):
# 计算输出
y_pred = classifier(batch_dict['x_surname'])
# 第三步:计算损失
loss = loss_func(y_pred, batch_dict['y_nationality'])
loss_t = loss.to("cpu").item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
running_acc += (acc_t - running_acc) / (batch_index + 1)
val_bar.set_postfix(loss=running_loss, acc=running_acc,
epoch=epoch_index)
val_bar.update()
train_state['val_loss'].append(running_loss)
train_state['val_acc'].append(running_acc)
# 更新训练状态
train_state = update_train_state(args=args, model=classifier,
train_state=train_state)
# 更新学习率
scheduler.step(train_state['val_loss'][-1])
# 检查是否需要提前停止
if train_state['stop_early']:
break
# 重置进度条
train_bar.n = 0
val_bar.n = 0
epoch_bar.update()
except KeyboardInterrupt:
print("Exiting loop") # 捕捉到键盘中断信号后退出循环
训练过程展示:
3.5评估模型
# 使用最佳模型在测试集上计算损失和准确率
# 加载最佳模型的参数
classifier.load_state_dict(torch.load(train_state['model_filename']))
classifier = classifier.to(args.device) # 将分类器移动到指定设备(CPU或GPU)
dataset.class_weights = dataset.class_weights.to(args.device) # 将数据集的类别权重移动到指定设备
loss_func = nn.CrossEntropyLoss(dataset.class_weights) # 定义损失函数
dataset.set_split('test') # 设置数据集分割为测试集
batch_generator = generate_batches(dataset,
batch_size=args.batch_size,
device=args.device) # 创建测试集批生成器
running_loss = 0. # 初始化损失为0
running_acc = 0. # 初始化准确率为0
classifier.eval() # 设置模型为评估模式
for batch_index, batch_dict in enumerate(batch_generator):
# 计算输出
y_pred = classifier(batch_dict['x_surname'])
# 计算损失
loss = loss_func(y_pred, batch_dict['y_nationality'])
loss_t = loss.item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
running_acc += (acc_t - running_acc) / (batch_index + 1)
train_state['test_loss'] = running_loss # 将测试集损失存储在训练状态中
train_state['test_acc'] = running_acc # 将测试集准确率存储在训练状态中
print("Test loss: {};".format(train_state['test_loss']))
print("Test Accuracy: {}".format(train_state['test_acc']))
测试结构如下:
Test loss: 1.7831686019897461; Test Accuracy: 46.31249999999999
3.6分类测试
使用训练好的模型进行姓氏分类
定义单个国籍预测函数:
def predict_nationality(surname, classifier, vectorizer):
"""预测新姓氏的国籍
Args:
surname (str): 要分类的姓氏
classifier (SurnameClassifer): 分类器的实例
vectorizer (SurnameVectorizer): 对应的向量化器
Returns:
包含最可能的国籍及其概率的字典
"""
vectorized_surname = vectorizer.vectorize(surname) # 将姓氏向量化
vectorized_surname = torch.tensor(vectorized_surname).view(1, -1) # 转换为张量并调整形状
result = classifier(vectorized_surname, apply_softmax=True) # 使用分类器预测
probability_values, indices = result.max(dim=1) # 获取最大概率值和对应的索引
index = indices.item()
predicted_nationality = vectorizer.nationality_vocab.lookup_index(index) # 获取预测的国籍
probability_value = probability_values.item() # 获取概率值
return {'nationality': predicted_nationality, 'probability': probability_value} # 返回结果字典
测试函数如下:
new_surname = input("Enter a surname to classify: ")
classifier = classifier.to("cpu")
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
prediction['nationality'],
prediction['probability']))
这里我输入了自己的姓氏,结构却给我得出了德国人的结果。。。
cheng -> German (p=0.35)
定义多个国籍预测函数:
vectorizer.nationality_vocab.lookup_index(8)
def predict_topk_nationality(name, classifier, vectorizer, k=5):
vectorized_name = vectorizer.vectorize(name) # 将名字向量化
vectorized_name = torch.tensor(vectorized_name).view(1, -1) # 转换为张量并调整形状
prediction_vector = classifier(vectorized_name, apply_softmax=True) # 使用分类器预测
probability_values, indices = torch.topk(prediction_vector, k=k) # 获取前k个最高概率值和对应的索引
# 返回的大小为 1,k
probability_values = probability_values.detach().numpy()[0]
indices = indices.detach().numpy()[0]
results = []
for prob_value, index in zip(probability_values, indices):
nationality = vectorizer.nationality_vocab.lookup_index(index) # 获取预测的国籍
results.append({'nationality': nationality,
'probability': prob_value})
return results
测试函数如下:
new_surname = input("Enter a surname to classify: ") # 输入要分类的姓氏
classifier = classifier.to("cpu") # 将分类器移动到CPU上
k = int(input("How many of the top predictions to see? ")) # 输入要查看的前几个预测
if k > len(vectorizer.nationality_vocab):
print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
k = len(vectorizer.nationality_vocab)
predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k) # 获取前k个预测结果
print("Top {} predictions:".format(k)) # 打印前k个预测结果
print("===================")
for prediction in predictions:
print("{} -> {} (p={:0.2f})".format(new_surname,
prediction['nationality'],
prediction['probability'])) # 打印预测结果及对应概率值
测试结果如下:
Top 5 predictions:
===================
dong -> English (p=0.27)
dong -> Scottish (p=0.26)
dong -> Irish (p=0.08)
dong -> German (p=0.08)
dong -> Polish (p=0.06)
四、CNN实现姓氏分类
姓氏分类任务中,使用多层感知机(MLP)和卷积神经网络(CNN)的实现大致流程是相同的,但也有不同之处。这些差异主要体现在模型架构、训练过程、和推断方法上。
模型架构
-
MLP模型架构:
- MLP由多个全连接层组成,通常包括输入层、若干个隐藏层和输出层。
- 每一层的神经元与上一层的所有神经元相连,信息通过层与层之间的权重矩阵传递。
- MLP适合处理平坦的向量输入,因此需要对数据进行平坦化处理。
-
CNN模型架构:
- CNN由卷积层和池化层组成,通常还包括全连接层。
- 卷积层用于提取局部特征,池化层用于减少特征图的尺寸。
- CNN适合处理二维或多维的张量输入,因此可以直接处理字符序列的矩阵表示。
训练过程
-
MLP训练过程:
- 输入数据需要平坦化处理。
- 通常使用交叉熵损失函数和Adam优化器。
- 训练过程中包含前向传播、计算损失、反向传播和更新参数。
-
CNN训练过程:
- 输入数据保留为二维张量。
- 使用相似的损失函数和优化器。
- 训练过程与MLP类似,但由于卷积操作,参数更新和梯度计算有些不同。
推断方法
-
MLP推断方法:
- 输入数据需要平坦化,然后通过模型进行预测。
- 通常使用softmax函数计算每个类别的概率。
-
CNN推断方法:
- 输入数据保持为二维张量,然后通过模型进行预测。
- 也使用softmax函数计算每个类别的概率。
完整代码
不再逐段分析CNN实现过程,这里直接给出实现代码:
from argparse import Namespace
from collections import Counter
import json
import os
import string
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm_notebook
class Vocabulary(object):
"""处理文本并提取词汇进行映射的类"""
def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
"""
Args:
token_to_idx (dict): 预先存在的token到索引的映射
add_unk (bool): 指示是否添加UNK token的标志
unk_token (str): 要添加到词汇表中的UNK token
"""
if token_to_idx is None:
token_to_idx = {}
self._token_to_idx = token_to_idx
# 创建从索引到token的映射
self._idx_to_token = {idx: token
for token, idx in self._token_to_idx.items()}
self._add_unk = add_unk
self._unk_token = unk_token
self.unk_index = -1
if add_unk:
self.unk_index = self.add_token(unk_token)
def to_serializable(self):
"""返回一个可以序列化的字典"""
return {'token_to_idx': self._token_to_idx,
'add_unk': self._add_unk,
'unk_token': self._unk_token}
@classmethod
def from_serializable(cls, contents):
""" 从序列化的字典实例化词汇表 """
return cls(**contents)
def add_token(self, token):
"""根据token更新映射字典.
Args:
token (str): 要添加到词汇表中的项目
Returns:
index (int): 对应于token的整数索引
"""
try:
index = self._token_to_idx[token]
except KeyError:
index = len(self._token_to_idx)
self._token_to_idx[token] = index
self._idx_to_token[index] = token
return index
def add_many(self, tokens):
"""将一组token添加到词汇表中
Args:
tokens (list): 字符串token的列表
Returns:
indices (list): 对应于这些token的索引列表
"""
return [self.add_token(token) for token in tokens]
def lookup_token(self, token):
"""
检索与token相关联的索引
或者如果token不存在,则返回UNK索引
Args:
token (str): 要查找的token
Returns:
index (int): 对应于token的索引
Notes:
`unk_index` 需要 >=0 (已添加到词汇表中)
才能使用UNK功能
"""
if self.unk_index >= 0:
return self._token_to_idx.get(token, self.unk_index)
else:
return self._token_to_idx[token]
def lookup_index(self, index):
"""返回与索引相关联的token
Args:
index (int): 要查找的索引
Returns:
token (str): 对应于该索引的token
Raises:
KeyError: 如果索引不在词汇表中
"""
if index not in self._idx_to_token:
raise KeyError("the index (%d) is not in the Vocabulary" % index)
return self._idx_to_token[index]
def __str__(self):
return "<Vocabulary(size=%d)>" % len(self)
def __len__(self):
return len(self._token_to_idx)
class SurnameVectorizer(object):
""" 该向量化器协调词汇并将其投入使用"""
def __init__(self, surname_vocab, nationality_vocab, max_surname_length):
"""
Args:
surname_vocab (Vocabulary): 将字符映射到整数的词汇表
nationality_vocab (Vocabulary): 将国籍映射到整数的词汇表
max_surname_length (int): 最长姓氏的长度
"""
self.surname_vocab = surname_vocab
self.nationality_vocab = nationality_vocab
self._max_surname_length = max_surname_length
def vectorize(self, surname):
"""
Args:
surname (str): 姓氏
Returns:
one_hot_matrix (np.ndarray): 一个one-hot向量矩阵
"""
# one-hot矩阵的尺寸为(字符词汇表的长度,最大姓氏长度)
one_hot_matrix_size = (len(self.surname_vocab), self._max_surname_length)
# 初始化one-hot矩阵,所有值为0,类型为float32
one_hot_matrix = np.zeros(one_hot_matrix_size, dtype=np.float32)
# 遍历姓氏中的每个字符及其位置
for position_index, character in enumerate(surname):
# 查找字符在词汇表中的索引
character_index = self.surname_vocab.lookup_token(character)
# 将one-hot矩阵中对应位置设为1
one_hot_matrix[character_index][position_index] = 1
return one_hot_matrix
@classmethod
def from_dataframe(cls, surname_df):
"""从数据集的DataFrame实例化向量化器
Args:
surname_df (pandas.DataFrame): 姓氏数据集
Returns:
SurnameVectorizer的一个实例
"""
surname_vocab = Vocabulary(unk_token="@")
nationality_vocab = Vocabulary(add_unk=False)
max_surname_length = 0
# 遍历数据集的每一行
for index, row in surname_df.iterrows():
# 更新最大姓氏长度
max_surname_length = max(max_surname_length, len(row.surname))
# 将姓氏中的每个字母添加到词汇表
for letter in row.surname:
surname_vocab.add_token(letter)
# 将国籍添加到词汇表
nationality_vocab.add_token(row.nationality)
return cls(surname_vocab, nationality_vocab, max_surname_length)
@classmethod
def from_serializable(cls, contents):
""" 从可序列化的字典实例化向量化器 """
surname_vocab = Vocabulary.from_serializable(contents['surname_vocab'])
nationality_vocab = Vocabulary.from_serializable(contents['nationality_vocab'])
return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab,
max_surname_length=contents['max_surname_length'])
def to_serializable(self):
""" 返回一个可以序列化的字典 """
return {'surname_vocab': self.surname_vocab.to_serializable(),
'nationality_vocab': self.nationality_vocab.to_serializable(),
'max_surname_length': self._max_surname_length}
class SurnameDataset(Dataset):
def __init__(self, surname_df, vectorizer):
"""
Args:
name_df (pandas.DataFrame): 数据集
vectorizer (SurnameVectorizer): 从数据集中实例化的向量化器
"""
self.surname_df = surname_df
self._vectorizer = vectorizer
self.train_df = self.surname_df[self.surname_df.split=='train']
self.train_size = len(self.train_df)
self.val_df = self.surname_df[self.surname_df.split=='val']
self.validation_size = len(self.val_df)
self.test_df = self.surname_df[self.surname_df.split=='test']
self.test_size = len(self.test_df)
self._lookup_dict = {'train': (self.train_df, self.train_size),
'val': (self.val_df, self.validation_size),
'test': (self.test_df, self.test_size)}
self.set_split('train')
# 类别权重
class_counts = surname_df.nationality.value_counts().to_dict()
def sort_key(item):
return self._vectorizer.nationality_vocab.lookup_token(item[0])
sorted_counts = sorted(class_counts.items(), key=sort_key)
frequencies = [count for _, count in sorted_counts]
self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
@classmethod
def load_dataset_and_make_vectorizer(cls, surname_csv):
"""从头开始加载数据集并创建一个新的向量化器
Args:
surname_csv (str): 数据集的位置
Returns:
SurnameDataset的一个实例
"""
surname_df = pd.read_csv(surname_csv)
train_surname_df = surname_df[surname_df.split=='train']
return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
@classmethod
def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
"""加载数据集和响应的向量化器
在向量化器以及缓存以便重用的情况下使用
Args:
surname_csv (str): 数据集的位置
vectorizer_filepath (str): 已保存向量化器的位置
Returns:
SurnameDataset的一个实例
"""
surname_df = pd.read_csv(surname_csv)
vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
return cls(surname_df, vectorizer)
@staticmethod
def load_vectorizer_only(vectorizer_filepath):
"""从文件加载向量化器的静态方法
Args:
vectorizer_filepath (str): 序列化向量化器的位置
Returns:
SurnameDataset的一个实例
"""
with open(vectorizer_filepath) as fp:
return SurnameVectorizer.from_serializable(json.load(fp))
def save_vectorizer(self, vectorizer_filepath):
"""使用json将向量化器保存到磁盘
Args:
vectorizer_filepath (str): 保存向量化器的位置
"""
with open(vectorizer_filepath, "w") as fp:
json.dump(self._vectorizer.to_serializable(), fp)
def get_vectorizer(self):
""" 返回向量化器 """
return self._vectorizer
def set_split(self, split="train"):
""" 使用dataframe中的列选择数据集中的拆分 """
self._target_split = split
self._target_df, self._target_size = self._lookup_dict[split]
def __len__(self):
return self._target_size
def __getitem__(self, index):
"""PyTorch 数据集的主要入口方法
Args:
index (int): 数据点索引
Returns:
包含数据点特征(x_data) 和标签 (y_target)的字典
"""
row = self._target_df.iloc[index]
surname_matrix = \
self._vectorizer.vectorize(row.surname)
nationality_index = \
self._vectorizer.nationality_vocab.lookup_token(row.nationality)
return {'x_surname': surname_matrix,
'y_nationality': nationality_index}
def get_num_batches(self, batch_size):
"""给定批量大小,返回数据集中批次数量
Args:
batch_size (int)
Returns:
数据集中的批次数量
"""
return len(self) // batch_size
def generate_batches(dataset, batch_size, shuffle=True,
drop_last=True, device="cpu"):
"""
一个包装pytorch DataLoader的生成器函数。它将确保每个张量位于正确的设备位置
"""
dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
shuffle=shuffle, drop_last=drop_last)
for data_dict in dataloader:
out_data_dict = {}
for name, tensor in data_dict.items():
out_data_dict[name] = data_dict[name].to(device)
yield out_data_dict
class SurnameClassifier(nn.Module):
def __init__(self, initial_num_channels, num_classes, num_channels):
"""
Args:
initial_num_channels (int):输入特征向量的大小
num_classes (int): 输出预测下岗了的大小
num_channels (int): 在网格中试一试的恒定通道大小
"""
super(SurnameClassifier, self).__init__()
# 定义卷积网络部分
self.convnet = nn.Sequential(
nn.Conv1d(in_channels=initial_num_channels,
out_channels=num_channels, kernel_size=3),
nn.ELU(),
nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
kernel_size=3, stride=2),
nn.ELU(),
nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
kernel_size=3, stride=2),
nn.ELU(),
nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
kernel_size=3),
nn.ELU()
)
# 定义全连接层
self.fc = nn.Linear(num_channels, num_classes)
def forward(self, x_surname, apply_softmax=False):
"""分类器的前向传递
Args:
x_surname (torch.Tensor): 输入数据张量。x_surname的形状应为(batch, initial_num_channels, max_surname_length)
apply_softmax (bool): 是否应用softmax激活的标志
如果与交叉熵损失一起使用,应设置为False
return:
结果张量。张量形状应为(batch, num_classes)
"""
# 通过卷积网络提取特征,并在第二维度上进行压缩
features = self.convnet(x_surname).squeeze(dim=2)
# 通过全连接层得到预测向量
prediction_vector = self.fc(features)
# 如果需要,应用softmax激活函数
if apply_softmax:
prediction_vector = F.softmax(prediction_vector, dim=1)
return prediction_vector
def make_train_state(args):
return {'stop_early': False,
'early_stopping_step': 0,
'early_stopping_best_val': 1e8,
'learning_rate': args.learning_rate,
'epoch_index': 0,
'train_loss': [],
'train_acc': [],
'val_loss': [],
'val_acc': [],
'test_loss': -1,
'test_acc': -1,
'model_filename': args.model_state_file}
def update_train_state(args, model, train_state):
"""
处理训练状态的更新
Components:
- Early Stopping: 防止过拟合
- Model Checkpoint: 如果模型更好,则保存模型
:param args: 主参数
:param model: 训练的模型
:param train_state: 表示训练状态值的字典
:returns:
一个更新后的train_state字典
"""
# 至少保存一次模型
if train_state['epoch_index'] == 0:
torch.save(model.state_dict(), train_state['model_filename'])
train_state['stop_early'] = False
# 如果性能改进,保存模型
elif train_state['epoch_index'] >= 1:
loss_tm1, loss_t = train_state['val_loss'][-2:]
# 如果损失变大
if loss_t >= train_state['early_stopping_best_val']:
# 更新提前停止步骤
train_state['early_stopping_step'] += 1
# 如果损失减小
else:
# 保存最佳模型
if loss_t < train_state['early_stopping_best_val']:
torch.save(model.state_dict(), train_state['model_filename'])
# 重置提前停止步骤
train_state['early_stopping_step'] = 0
# 是否提前停止?
train_state['stop_early'] = \
train_state['early_stopping_step'] >= args.early_stopping_criteria
return train_state
args = Namespace(
# 数据和路径信息
surname_csv="data/surnames/surnames_with_splits.csv",
vectorizer_file="vectorizer.json",
model_state_file="model.pth",
save_dir="model_storage/ch4/cnn",
# 模型超参数
hidden_dim=100,
num_channels=256,
# 训练超参数
seed=1337,
learning_rate=0.001,
batch_size=128,
num_epochs=100,
early_stopping_criteria=5,
dropout_p=0.1,
# 运行时选项
cuda=False,
reload_from_files=False,
expand_filepaths_to_save_dir=True,
catch_keyboard_interrupt=True
)
if args.expand_filepaths_to_save_dir:
args.vectorizer_file = os.path.join(args.save_dir,
args.vectorizer_file)
args.model_state_file = os.path.join(args.save_dir,
args.model_state_file)
print("Expanded filepaths: ")
print("\t{}".format(args.vectorizer_file))
print("\t{}".format(args.model_state_file))
# Check CUDA
if not torch.cuda.is_available():
args.cuda = False
args.device = torch.device("cuda" if args.cuda else "cpu")
print("Using CUDA: {}".format(args.cuda))
def set_seed_everywhere(seed, cuda):
np.random.seed(seed)
torch.manual_seed(seed)
if cuda:
torch.cuda.manual_seed_all(seed)
def handle_dirs(dirpath):
if not os.path.exists(dirpath):
os.makedirs(dirpath)
# Set seed for reproducibility
set_seed_everywhere(args.seed, args.cuda)
# handle dirs
handle_dirs(args.save_dir)
def compute_accuracy(y_pred, y_target):
# 获取每行中最大值的索引,及预测类别
y_pred_indices = y_pred.max(dim=1)[1]
# 计算预测正确的数量
n_correct = torch.eq(y_pred_indices, y_target).sum().item()
# 返回准确率的百分比
return n_correct / len(y_pred_indices) * 100
if args.reload_from_files:
# 从检查点开始训练
dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv,
args.vectorizer_file)
else:
# 创建数据集和向量化器
dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
dataset.save_vectorizer(args.vectorizer_file)
# 获取向量化器
vectorizer = dataset.get_vectorizer()
# 初始化分类器
classifier = SurnameClassifier(initial_num_channels=len(vectorizer.surname_vocab),
num_classes=len(vectorizer.nationality_vocab),
num_channels=args.num_channels)
# 将分类器移到指定设备上
classifier = classifier.to(args.device)
# 将数据集的类权重移到指定设备上
dataset.class_weights = dataset.class_weights.to(args.device)
# 定义损失函数,使用交叉熵损失并加上类权重
loss_func = nn.CrossEntropyLoss(weight=dataset.class_weights)
# 定义优化器,使用Adam优化器
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
# 定义学习率调度器,当监测的量不再减少时,减少学习率
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
mode='min', factor=0.5,
patience=1)
# 初始化训练状态
train_state = make_train_state(args)
epoch_bar = tqdm_notebook(desc='training routine',
total=args.num_epochs,
position=0)
# 设置数据集分割为训练集
dataset.set_split('train')
train_bar = tqdm_notebook(desc='split=train',
total=dataset.get_num_batches(args.batch_size),
position=1,
leave=True)
# 设置数据集分割为验证集
dataset.set_split('val')
val_bar = tqdm_notebook(desc='split=val',
total=dataset.get_num_batches(args.batch_size),
position=1,
leave=True)
try:
for epoch_index in range(args.num_epochs):
train_state['epoch_index'] = epoch_index
# 迭代训练数据集
# 设置: 批生成器,将损失和准确率设为0,启用训练模式
dataset.set_split('train')
batch_generator = generate_batches(dataset,
batch_size=args.batch_size,
device=args.device)
running_loss = 0.0
running_acc = 0.0
classifier.train()
for batch_index, batch_dict in enumerate(batch_generator):
# 训练流程如下五步:
# --------------------------------------
# 步骤1:清零梯度
optimizer.zero_grad()
# 步骤2:计算输出
y_pred = classifier(batch_dict['x_surname'])
# 步骤3:计算损失
loss = loss_func(y_pred, batch_dict['y_nationality'])
loss_t = loss.item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 步骤4:使用损失产生梯度
loss.backward()
# 步骤5:使用优化器进行梯度更新
optimizer.step()
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
running_acc += (acc_t - running_acc) / (batch_index + 1)
# 更新进度条
train_bar.set_postfix(loss=running_loss, acc=running_acc,
epoch=epoch_index)
train_bar.update()
train_state['train_loss'].append(running_loss)
train_state['train_acc'].append(running_acc)
# 迭代验证数据集
# 设置: 批生成器,将损失和准确率设为0,启用评估模式
dataset.set_split('val')
batch_generator = generate_batches(dataset,
batch_size=args.batch_size,
device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()
for batch_index, batch_dict in enumerate(batch_generator):
# 计算输出
y_pred = classifier(batch_dict['x_surname'])
# 步骤3:计算损失
loss = loss_func(y_pred, batch_dict['y_nationality'])
loss_t = loss.item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
running_acc += (acc_t - running_acc) / (batch_index + 1)
val_bar.set_postfix(loss=running_loss, acc=running_acc,
epoch=epoch_index)
val_bar.update()
train_state['val_loss'].append(running_loss)
train_state['val_acc'].append(running_acc)
# 更新训练状态
train_state = update_train_state(args=args, model=classifier,
train_state=train_state)
# 调度学习率
scheduler.step(train_state['val_loss'][-1])
# 判断是否提前停止
if train_state['stop_early']:
break
# 重置进度条
train_bar.n = 0
val_bar.n = 0
epoch_bar.update()
except KeyboardInterrupt:
print("Exiting loop")
# 加载模型状态
classifier.load_state_dict(torch.load(train_state['model_filename']))
# 将分类器移到指定设备上
classifier = classifier.to(args.device)
# 将数据集的类权重移到指定设备上
dataset.class_weights = dataset.class_weights.to(args.device)
# 定义损失函数,使用交叉熵损失并加上类权重
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
# 设置数据集分割为测试集
dataset.set_split('test')
batch_generator = generate_batches(dataset,
batch_size=args.batch_size,
device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()
for batch_index, batch_dict in enumerate(batch_generator):
# c计算输出
y_pred = classifier(batch_dict['x_surname'])
# 计算损失
loss = loss_func(y_pred, batch_dict['y_nationality'])
loss_t = loss.item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict['y_nationality'])
running_acc += (acc_t - running_acc) / (batch_index + 1)
# 将测试集的损失和准确率保存到训练状态中
train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc
print("Test loss: {};".format(train_state['test_loss']))
print("Test Accuracy: {}".format(train_state['test_acc']))
def predict_nationality(surname, classifier, vectorizer):
"""预测一个新姓氏的国籍
Args:
surname (str): 待分类的姓氏
classifier (SurnameClassifer): 分类器的实例
vectorizer (SurnameVectorizer): 对应的向量化器
Returns:
包含最可能的国籍及其概率的字典
"""
# 将姓氏向量化
vectorized_surname = vectorizer.vectorize(surname)
vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(0)
# 获取预测结果
result = classifier(vectorized_surname, apply_softmax=True)
# 获取预测的国籍及其概率
probability_values, indices = result.max(dim=1)
index = indices.item()
predicted_nationality = vectorizer.nationality_vocab.lookup_index(index)
probability_value = probability_values.item()
return {'nationality': predicted_nationality, 'probability': probability_value}
new_surname = input("Enter a surname to classify: ")
classifier = classifier.cpu()
prediction = predict_nationality(new_surname, classifier, vectorizer)
print("{} -> {} (p={:0.2f})".format(new_surname,
prediction['nationality'],
prediction['probability']))
def predict_topk_nationality(surname, classifier, vectorizer, k=5):
"""从新的姓氏预测前k个国籍
Args:
surname (str): 待分类的姓氏
classifier (SurnameClassifer): 分类器的实例
vectorizer (SurnameVectorizer): 对应的向量化器
k (int): 返回的前k个国籍数
Returns:
字典的列表,每一个字典表示一个国籍及其概率
"""
# 将姓氏向量化
vectorized_surname = vectorizer.vectorize(surname)
vectorized_surname = torch.tensor(vectorized_surname).unsqueeze(dim=0)
# 使用分类器进行预测
prediction_vector = classifier(vectorized_surname, apply_softmax=True)
probability_values, indices = torch.topk(prediction_vector, k=k)
# 返回的尺寸是 1,k
probability_values = probability_values[0].detach().numpy()
indices = indices[0].detach().numpy()
results = []
for kth_index in range(k):
# 根据索引查找国籍
nationality = vectorizer.nationality_vocab.lookup_index(indices[kth_index])
probability_value = probability_values[kth_index]
results.append({'nationality': nationality,
'probability': probability_value})
return results
new_surname = input("Enter a surname to classify: ")
k = int(input("How many of the top predictions to see? "))
if k > len(vectorizer.nationality_vocab):
print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
k = len(vectorizer.nationality_vocab)
predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
print("Top {} predictions:".format(k))
print("===================")
for prediction in predictions:
print("{} -> {} (p={:0.2f})".format(new_surname,
prediction['nationality'],
prediction['probability']))
五、总结
在姓氏分类任务中,我们实现了两种不同的深度学习模型:卷积神经网络(CNN)和多层感知机(MLP)。这两种方法各有其特点和优势:
CNN模型 通过卷积和池化操作有效地捕捉字符级别的局部模式和特征,适合于需要考虑字符序列内部关系的文本分类任务。。 在姓氏分类任务中表现出较高的准确率,能够更好地处理输入序列中的空间结构信息。
MLP模型 使用多个全连接层依次处理输入特征,对于输入序列结构不复杂的问题,MLP模型可以在保证一定准确率的情况下提升训练效率。MLP训练速度较快,适合简单的序列分类任务。
综上所述,选择适合具体任务需求的模型架构非常重要。CNN模型在复杂数据关系处理上更为强大,而MLP模型则更适合处理简单而直接的分类问题。
我纯纯新手,可能有许多介绍不到位的甚至错误的地方,还请见谅。欢迎批评指正。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)