【重识别系列01】打怪之前先练级:用三元组+交叉熵搞定行人重识别
前言
随着城市化进程的加快和外卖/快递等即时配送行业的爆发,两轮电动车已经成为城市交通中不可或缺,但也最难管理的组成部分。从闯红灯、逆行、非法改装到车辆盗窃,传统的交通安防监控在面对庞大的电动车大军时,正显得力不从心。
“为什么不像机动车那样识别车牌?”
简单来说,现有的车牌地区差异大,面积小,还有反光、污泥遮挡等问题。由于监管相对比机动车更松,也很容易存在无牌、贴牌、套牌等问题,溯源更麻烦。
“为什么不用目标检测算法?”
常规的目标检测算法(如 YOLO 系列)只能告诉我们画面里有什么,却无法在跨摄像头的复杂场景下回答“这是不是刚刚违章的那辆车”。
于是有了电动车重识别(E-bike Re-ID)领域。
当前行人重识别是计算机视觉里很火的一个方向,这种技术可以在不同摄像头下识别具体的人。并且由于大量的研究结果,在对于刚才提到的低分辨率、遮挡、光照等问题都有了很多的解决方案,我认为不少是可以借鉴到电动车重识别的领域。
我们希望构建一套不依赖车牌的视觉追踪系统。我们可以联合“骑行者 + 电动车”一起进行识别。同时提取车辆特征(车型、颜色、是否有后备箱)和骑行者特征(头盔颜色、衣着、体态、雨衣),在无重叠视野的跨摄像头网络中,实现目标身份的精准匹配与持续追踪。这不仅是智慧城市和智能交通治理的最后一块拼图,也是一项极具挑战性的计算机视觉前沿任务。
由于笔者也是一名在读人工智能相关方向学生,实现这个系统还有点困难,笔者准备先从行人重识别开始尝试,后续再针对电动车重识别进行相关领域进行微调。
因此,有了本系列的第一篇(即本文),根据现有技术和现在成熟的数据集,尝试着先实现一下行人重识别。本文是一篇技术分享,同时也是一份学习日记。
技术路线
当前行人重识别方向的数据相对于电动车来说是很充足的。本文采用清华大学的Market1501数据集,在这个数据集中,包含了 1501 个不同的人,其中 751 个人的图像作为训练集(Query),另外 750 个人作为测试集(Gallery)。对于笔者这样的初学者,可以先从这种标准的干净数据集起手。
模型方面采用已经训练成熟的Resnet50作为骨干网络,这个基于ImageNet上预训练过的模型已经具备了强大的基础视觉特征提取能力,能够识别“花鸟虫鱼”,不需要重新训练一套模型。我们实现简单的行人重识别只需要将其分类全连接层拿去,训练出自己的分类层或是特征向量提取层即可。
训练策略上笔者采用 三元组损失(Triplet Loss) 和 交叉熵(Cross-Entropy) 两种损失函数进行训练。
- 交叉熵是一个分类器:它负责宏观的领地划分。它将骨干网络提取出的 2048 维特征向量映射为 751 个类别的概率分布,并与真实标签对比。通俗地说,它给每个人划定了一个“专属桶”,强迫模型把正确的照片扔进正确的桶里。但它的局限性在于“得过且过”:只要同一个人的照片都扔进了正确的桶里,即使它们在桶内散落得十万八千里,交叉熵也不会继续优化了。
- 三元组损失是一个质检员:它专门解决交叉熵“管不了类内紧凑度”的缺陷。它不看“桶”的边界,而是直接在特征空间里抓取三张图(原图、同类图、异类图)组成三元组。它强制要求:同类图之间的距离,必须比异类图之间的距离小一个设定的阈值。它像磁铁一样,实现了真正的“同类死死抱团,异类相互排斥”。
如果只用交叉熵,模型测试时遇到没见过的人,很容易抓瞎,因为特征太散,没学到“怎么量距离”。如果只用三元组,模型一开始不知道大方向在哪,就像蒙着眼睛找人,收敛极其困难,经常训崩。两者结合:交叉熵先画好大地图(给个坐标系),三元组再进去精雕细琢(把该凑近的凑近)。
由于三元组损失喜欢让特征在空间里自由分布(关注相对距离),而交叉熵喜欢把特征死死压缩在坐标轴附近(关注绝对分类边界)。如果直接相加会导致模型难以收敛,所以笔者采取BNNeck进行合并:ResNet50提取出的全局特征后,加入一层批量归一化层——Batch Normalization (BN层)。BN层对每个批次(batch)的数据计算均值和方差,强制将分布拉回到均值0、方差 1 的标准正态分布。交叉熵通常在BN层之后计算,而三元组损失在 BN 层之前计算。这种“分流”设计,使得两种损失函数能够完美地和谐共存,极大提升了模型的检索精度。。
只有在这套标准的行人重识别任务上学会如何用ResNet50的提取行人特征,我们才有底气在下一篇章中,带着这套强基线系统,去迎战复杂的电动车追踪任务。
现在,让我们先忘掉繁杂的电动车,从区分 751 个行人的身份开始。
项目文件夹结构
在开始数据处理之前,先了解一下我们的项目文件夹结构:
REID_TRIPLE/
├── data/
│ └── Market-1501-v15.09.15/
│ ├── bounding_box_train/
│ ├── bounding_box_test/
│ └── query/
├── dataset.py
├── model.py
├── sampler.py
├── train.py
├── Triplet.py
├── val.py
└── blog.md
data/文件夹:存放 Market-1501 数据集。bounding_box_train/包含训练图像,bounding_box_test/和query/用于测试。- 其余
.py文件:我们的核心代码模块。
有了这个结构,我们就可以开始加载数据了。
数据加载:dataset.py
这一部分的作用是把硬盘上的 Market1501 训练图像,转换成一个个可供训练的样本。它负责:
- 读取训练目录中的文件名
- 过滤掉无效图像
- 从文件名中解析 PID 和 CamID
- 将 PID 映射为连续标签
- 返回图像、标签、摄像头信息和文件名
这是加载数据集的第一环,后续模型和采样器都会基于这个输出工作。
import os
from PIL import Image
from torch.utils.data import Dataset
class Market1501(Dataset):
def __init__(self, img_dir, transform=None, is_train=True):
self.img_dir = img_dir
self.transform = transform
# 过滤掉非jpg文件和背景(PID为-1或0000)
self.img_names = [f for f in os.listdir(img_dir) if f.endswith('.jpg') and not f.startswith('-1') and not f.startswith('0000')]
# 训练阶段需要将PID映射为 0 到 N-1 的连续索引
if is_train:
pids = set([int(name.split('_')[0]) for name in self.img_names])
self.pid2label = {pid: i for i, pid in enumerate(sorted(list(pids)))}
else:
self.pid2label = None
def __len__(self):
return len(self.img_names)
def __getitem__(self, idx):
img_name = self.img_names[idx]
img_path = os.path.join(self.img_dir, img_name)
# 解析 PID 和 CamID
pid = int(img_name.split('_')[0])
camid = int(img_name.split('_')[1][1])
image = Image.open(img_path).convert('RGB')
if self.transform:
image = self.transform(image)
label = self.pid2label[pid] if self.pid2label else pid
return image, label, camid, img_name
1 初始化
首先需要对于样本进行信息格式化。我们需要提取数据集中照片的名称列表self.img_names和路径列表self.img_dir。通过名称和路径可以找到这张图的人物ID和摄像机ID。
在Market1501 数据集的文件夹中也不全是可用样本,需要筛选文件名:
- 只保留
.jpg图片 - 丢弃 PID 为
-1的背景图 - 丢弃 PID 为
0000的坏卡图
self.img_names = [f for f in os.listdir(img_dir)
if f.endswith('.jpg')
and not f.startswith('-1')
and not f.startswith('0000')]
避免噪声图像进入训练集。
2 PID 重编码
PID是照片中的人物ID,Market1501 的 PID 并不是从 0 连续编号的。为了让交叉熵损失正确工作,我们需要把它映射为 0..N-1。
pids = set([int(name.split('_')[0]) for name in self.img_names])
self.pid2label = {pid: i for i, pid in enumerate(sorted(list(pids)))}
这样一来,模型的 751 类输出就对应到连续标签 0..750。
3 输出样本
每次调用单个样本就会自动执行_getitem_函数,idx是我们单次查询的样本索引,让样本以格式化形式输出到训练代码
img_name = self.img_names[idx]
img_path = os.path.join(self.img_dir, img_name)
# 解析 PID 和 CamID
pid = int(img_name.split('_')[0])
camid = int(img_name.split('_')[1][1])
我们把图片名列表和路径列表中idx的图片名称、路径、人物ID、摄像头ID赋值给img_name、img_path、pid和camid,并转化为相应的格式。
image = Image.open(img_path).convert('RGB')
if self.transform:
image = self.transform(image)
label = self.pid2label[pid] if self.pid2label else pid
然后从图片文件里读出图像后转化成RGB图像,如果需要对图片变换就做相应的变换,并拎出这个样本的标签。最终将这些一起打包成一个训练样本后返回。后面 sampler.py 只关心 label,train.py 则拿 image 和 label 训练,camid 和 img_name 主要是调试和评估时用。
模型加载:model.py
这一部分的作用是搭建特征提取和分类器的网络结构,它把 dataset.py 提供的图像转换为可训练的特征向量。
import torch
import torch.nn as nn
from torchvision import models
class ReIDResNet50(nn.Module):
def __init__(self, num_classes=751):
super(ReIDResNet50, self).__init__()
# 加载 torchvision 官方预训练 ResNet50
resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
# 去掉最后的池化层和全连接层
self.backbone = nn.Sequential(*list(resnet.children())[:-2])
# ReID 常用的全局平均池化(把空间特征压成一个2048维向量)
self.gap = nn.AdaptiveAvgPool2d((1, 1))
# Bottleneck (BN层),用于规范化特征,对检索有帮助
self.bottleneck = nn.BatchNorm1d(2048)
self.bottleneck.bias.requires_grad_(False)
nn.init.normal_(self.bottleneck.weight, std=1.0)
nn.init.constant_(self.bottleneck.bias, 0.0)
# 分类器
self.classifier = nn.Linear(2048, num_classes, bias=False)
nn.init.normal_(self.classifier.weight, std=0.001)
def forward(self, x):
x = self.backbone(x) # 输出形状: (Batch, 2048, H, W)
x = self.gap(x) # 输出形状: (Batch, 2048, 1, 1)
global_feat = x.view(x.size(0), -1) # 拉平: (Batch, 2048)
feat = self.bottleneck(global_feat)
if self.training:
cls_score = self.classifier(feat)
return cls_score, global_feat # 训练时返回分类得分计算交叉熵损失
else:
return feat # 测试时直接返回特征用于计算距离
1 预训练ResNet50,提取BACKBONE层
我们使用 torchvision 提供的预训练模型 ResNet50,然后去掉最后的全连接层和池化层,这里的backbone就是去掉了全连接层和池化层。这样我们保留了 ResNet50 的强特征提取能力,同时为 ReID 任务定制后续层。接下来,我们都通过 self.xxx = nn.LayerName() 的方式为模型添加自定义层。
self.backbone = nn.Sequential(*list(resnet.children())[:-2])
2 定义GAP与BNNeck层:把空间特征变成可对比向量
ResNet最后输出的是一个空间特征图(2048xH×W)。ReID任务不需要保留空间位置信息,只要得到一个全局描述。
我们定义了一个gap层,通过AdaptiveAvgPool2d把H和W通道取平均值,把前一层卷积特征池化压成2048x1x1维向量:
self.gap = nn.AdaptiveAvgPool2d((1, 1))
由于三元组和交叉熵对于不同的特征向量的需求不一样——分类损失偏好较大方差的特征以区分不同类别,而三元组损失需要原始特征分布以准确计算距离——我们创建一个归一化层bottleneck把这2048维向量做批量归一化,让交叉熵损失函数经过BN层获得更稳定的特征,三元组损失函数使用原始特征。通过在不同特征空间分别计算两种损失,避免它们直接在同一个特征上产生冲突:
self.bottleneck = nn.BatchNorm1d(2048)
3 定义分类层CLASSIFIER:把特征映射为类别
我们创建了分类层classifier,这是个线性层,将2048维特征向量转化为num_classes个类别:
self.classifier = nn.Linear(2048, num_classes, bias=False)
4 训练与测试输出分离
模型向前传播的过程在forward()函数里实现:
global_feat = x.view(x.size(0), -1)
feat = self.bottleneck(global_feat)
对于输入进来的x是经过backbone和gap后的特征图,通过 view 展平得到global_feat向量,global_feat向量经过bottleneck层后得到feat.
if self.training:
cls_score = self.classifier(feat)
return cls_score, global_feat
else:
return feat
训练阶段返回两个值:
cls_score用于交叉熵损失,交叉熵需要对图像进行分类,cls_score就是图像经过classifier后的分类;global_feat用于三元组损失,因为三元组需要在原始特征空间里比较样本距离。
测试阶段只返回 feat,因为检索时只需要一个稳定的特征向量进行距离计算,不需要分类器输出。
采样器:sampler.py
这一部分的作用是为训练阶段构造“每个 batch 的采样方案”。它保证每个 batch 中:
- 含有
P个不同身份(不同的 PID) - 每个身份包含
K张图像
BatchSize和K是可以自行设置的,P = BatchSize / K(或者P * K = BatchSize)
这里的 P 和 K 用来保证 batch 里既有足够的身份多样性,又有足够的同身份样本。没有这一步,三元组损失在 batch 内无法获得足够的正样本和负样本。
import random
from collections import defaultdict
from torch.utils.data.sampler import Sampler
class RandomIdentitySampler(Sampler):
"""
随机身份采样器:确保每个 Batch 内包含 P 个人,每个人 K 张图。
BatchSize = P * K
"""
def __init__(self, data_source, batch_size, num_instances):
self.data_source = data_source
self.batch_size = batch_size
self.num_instances = num_instances # 就是 K (每个人抽几张)
self.index_dic = defaultdict(list) # defaultdict默认字典省去了初始化每个PID图集的列表
for index, (_, label, _, _) in enumerate(data_source): # enumerate遍历整个数据集时可同时获得元素的索引和元素本身
self.index_dic[label].append(index)
self.pids = list(self.index_dic.keys())
def __iter__(self):
indices = []
pids = self.pids.copy()
random.shuffle(pids)
for pid in pids:
t = self.index_dic[pid]
# 如果某个人的照片数够 K 张,随机抽 K 张;如果不够,允许重复抽满 K 张
if len(t) >= self.num_instances:
t = random.sample(t, self.num_instances)
else:
t = random.choices(t, k=self.num_instances)
indices.extend(t)
return iter(indices)
def __len__(self):
return len(self.pids) * self.num_instances
1 按 PID 分组
采样器初始化时先遍历数据集data_source,把同一身份(label)的所有样本索引(PID)收集到字典里:
self.index_dic = defaultdict(list) # defaultdict默认字典省去了初始化每个PID图集的列表
for index, (_, label, _, _) in enumerate(data_source): # enumerate遍历整个数据集时可同时获得元素的索引和元素本身
self.index_dic[label].append(index)
self.pids = list(self.index_dic.keys())
这样就有了“每个PID拥有哪些样本(图)”的映射表。
2 生成 batch 索引
__iter__()迭代器里先把 PID 随机打乱,再逐个抽取K张图,如果图集里不够K张图,则允许重复抽取:
pids = self.pids.copy()
random.shuffle(pids)
for pid in pids:
t = self.index_dic[pid]
if len(t) >= self.num_instances:
t = random.sample(t, self.num_instances) # 不放回地随机抽取K张图
else:
t = random.choices(t, k=self.num_instances) # 可放回地抽取K张图
indices.extend(t) # 逐个添加元素到列表,而不是添加整个列表作为一个元素
最终返回的是一个按身份组织好的索引列表 iter(indices) 。由于采样器保证了同一人的K张索引连续排列,DataLoader 返回的 batch 中,前 K 张属于同一个 PID,接着 K 张属于下一个 PID,以此类推。因此我们可以利用已知的 K 值来切分出每个 PID 对应的图片组,从而自然形成 BatchSize = P × K 的 batch。
三元组损失:Triplet.py
三元组我们采用的是困难样本挖掘三元组。通过找到每个anchor里找到最远正样本和最近负样本,来增强模型对于困难样本的学习能力,这会比随机采样三元组更有学习效率。
讲具体实现代码之前必须要先说明清楚:三元组损失的脚本的目的是接受前面已经通过Resnet50训练后的2048维特征向量,经过我们定义的ranking_loss层得到损失值。接收到特征向量后,我们要对这若干个特征向量互相取得欧氏距离,根据欧氏距离得到最远正样本和最近负样本。将anchor和最远正样本和最近负样本,得到损失函数值。
import torch
import torch.nn as nn
class TripletLoss(nn.Module):
# 初始化正负样本距离之间的最小差距和损失函数层
def __init__(self, margin=0.3):
super(TripletLoss, self).__init__()
self.margin = margin
self.ranking_loss = nn.MarginRankingLoss(margin=margin)
# 通过backbone层传递的特征向量(inputs)向前传播身份标签PID(targents)
def forward(self, inputs, targets):
"""
inputs: 形状为 (Batch, 2048) 的特征向量
targets: 形状为 (Batch,) 的身份标签
"""
n = inputs.size(0) # 传递进来多少个特征向量
# 计算这些特征向量间的欧氏距离 dist[i][j] = sqrt(||x_i - x_j||²) = sqrt(||x_i||² + ||x_j||² - 2·(x_i·x_j))
dist = torch.pow(inputs, 2).sum(dim=1, keepdim=True).expand(n, n) # n个特征向量每一个数字平方后求内和,再扩展为n维矩阵: 求||x_i||²
dist = dist + dist.t() # 矩阵与转置矩阵求和,得dist = ||x_i||² + ||x_j||²
dist.addmm_(1, -2, inputs, inputs.t()) # dist = 1 * dist + (-2) * (input @ input.t())
dist = dist.clamp(min=1e-12).sqrt() # 将dist的值限制在1e-12以上并开方
# 身份标签掩码,来标记哪些样本对是正样本(同一个人),哪些是负样本(不同人)。A.eq(B)表示比较A和B同一位置的值是否相等,相等输出T,否则输出F
mask = targets.expand(n, n).eq(targets.expand(n, n).t())
# 关键部分:对于每一个向量,找到最远的同类和最近的不同类
dist_ap, dist_an = [], []
for i in range(n):
dist_ap.append(dist[i][mask[i]].max().unsqueeze(0))
dist_an.append(dist[i][mask[i] == 0].min().unsqueeze(0))
# 将不同的距离拼成一个向量
dist_ap = torch.cat(dist_ap)
dist_an = torch.cat(dist_an)
# 计算损失函数
y = torch.ones_like(dist_an)
return self.ranking_loss(dist_an, dist_ap, y)
训练模型:train.py
下面在做了一系列的前期准备任务后,终于进入训练主流程。整个训练包含四个核心部分:
- 数据准备:数据增强 + 数据集 + 采样器
- 模型初始化:加载 ResNet50 骨干网络,定义分类头和 BNNeck
- 损失与优化器:交叉熵损失 + 三元组损失,Adam 优化器
- 训练循环:前向传播 → 计算联合损失 → 反向传播 → 更新参数
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
import os
# 导入自定义模块
from dataset import Market1501
from model import ReIDResNet50
from Triplet import TripletLoss
from sampler import RandomIdentitySampler
def train():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 1. 数据增强与加载
transform_train = transforms.Compose([
transforms.Resize((256, 128)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
#加载训练数据集
train_dir = 'data/Market-1501-v15.09.15/bounding_box_train'
train_dataset = Market1501(train_dir, transform=transform_train, is_train=True)
#加载训练数据集的采样器,确保每个batch包含P个人,每个人K张图片
sampler = RandomIdentitySampler(train_dataset, batch_size=64, num_instances=4)
train_loader = DataLoader(train_dataset, batch_size=64, sampler=sampler, num_workers=4)
# 2. 初始化模型、损失函数和优化器 (Market1501训练集固定751个人)
model = ReIDResNet50(num_classes=751).to(device)
id_criterion = nn.CrossEntropyLoss()
tri_criterion = TripletLoss(margin=0.3)
optimizer = optim.Adam(model.parameters(), lr=0.0003, weight_decay=5e-4)
# 3. 开始训练
epochs = 60
for epoch in range(epochs):
model.train()
# 初始化每个 epoch 的损失和准确率统计
running_id_loss = 0.0
running_tri_loss = 0.0
correct = 0
total = 0
for images, labels, _, _ in train_loader:
images, labels = images.to(device), labels.to(device)
# 计算分类损失和三元组损失
cls_score, global_feat = model(images) # 前向传播
optimizer.zero_grad() # 清空梯度
loss_id = id_criterion(cls_score, labels)
loss_tri = tri_criterion(global_feat, labels)
loss = loss_id + loss_tri
loss.backward()
optimizer.step()
# 记录数据
running_id_loss += loss_id.item()
running_tri_loss += loss_tri.item()
_, predicted = cls_score.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
print(f"Epoch [{epoch+1}/{epochs}] "
f"ID Loss: {running_id_loss/len(train_loader):.4f} "
f"Tri Loss: {running_tri_loss/len(train_loader):.4f} "
f"Acc: {100.*correct/total:.2f}%")
# 保存权重
torch.save(model.state_dict(), 'resnet50_reid.pth')
print("模型已保存到 resnet50_reid.pth")
if __name__ == '__main__':
train()
1 数据准备:加载增强后的数据集
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # cuda加速
transform_train = transforms.Compose([
transforms.Resize((256, 128)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_dataset = Market1501('data/Market-1501-v15.09.15/bounding_box_train',
transform=transform_train, is_train=True)
sampler = RandomIdentitySampler(train_dataset, batch_size=64, num_instances=4)
train_loader = DataLoader(train_dataset, batch_size=64, sampler=sampler,
num_workers=4)
这一部分负责设置数据增强策略、加载 Market1501 训练集,并用 RandomIdentitySampler 保证每个 batch 包含 P 个人各 K 张图。
数据增强:Resize 将图像统一为 256×128,RandomHorizontalFlip 将图像随机水平翻转,增加视角变化,ToTensor 和 Normalize 将图像转换为模型可接受的张量格式。这样的预处理既保证了输入尺寸一致,又能提高模型对姿态和光照变化的鲁棒性。
数据集与采样器:train_dataset 调用 dataset.py 中定义的 Market1501 类,完成图像路径解析、标签映射等加载逻辑。RandomIdentitySampler 负责按身份组织索引顺序,确保每个 batch 内每个人出现 K=4 张图。最后 DataLoader 将数据集和采样器融合,负责批量读取、多进程加速和数据打乱,供后续训练循环使用。
2 模型、损失函数、优化器
model = ReIDResNet50(num_classes=751).to(device)
id_criterion = nn.CrossEntropyLoss()
tri_criterion = TripletLoss(margin=0.3)
optimizer = optim.Adam(model.parameters(), lr=0.0003, weight_decay=5e-4)
这一段代码完成了训练部分的核心配置:先把我们前面定义的 ReID 模型实例化为 ReIDResNet50(num_classes=751),并调用 to(device) 把它放到 GPU 上运行(如果有 GPU 可用),这样前向传播和反向传播都能更快。num_classes=751是因为 Market1501 训练集固定有 751 个不同身份。
接着定义两个损失函数,交叉熵函数nn.CrossEntropyLoss() 用于身份分类,把模型输出的 cls_score 与真实标签对齐,主要负责区分不同的人;而三元组TripletLoss(margin=0.3) 则用于度量学习,它在特征空间中挖掘“最远的同类”和“最近的异类”,让同一个人的特征更加紧凑、不同人的特征更加分散。
最后用 optim.Adam(model.parameters(), lr=0.0003, weight_decay=5e-4) 来更新模型参数。Adam 不是一种损失,而是一个优化器,它会根据参数的历史梯度自动调整每个参数的学习步长,使训练更稳定、更容易收敛;而 weight_decay=5e-4 则相当于对参数加上一点惩罚,帮助防止模型过拟合。这三者合起来,就把模型结构、损失目标和参数更新方式这三大块都串联起来,为接下来的训练循环打下了基础。
3 训练循环主体
epochs = 60
for epoch in range(epochs):
model.train()
running_id_loss = 0.0
running_tri_loss = 0.0
correct = 0
total = 0
for images, labels, _, _ in train_loader:
images, labels = images.to(device), labels.to(device)
cls_score, global_feat = model(images)
optimizer.zero_grad()
loss_id = id_criterion(cls_score, labels)
loss_tri = tri_criterion(global_feat, labels)
loss = loss_id + loss_tri
loss.backward()
optimizer.step()
running_id_loss += loss_id.item()
running_tri_loss += loss_tri.item()
_, predicted = cls_score.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
print(f"Epoch [{epoch+1}/{epochs}] "
f"ID Loss: {running_id_loss/len(train_loader):.4f} "
f"Tri Loss: {running_tri_loss/len(train_loader):.4f} "
f"Acc: {100.*correct/total:.2f}%")
首先设置 epochs = 60,表示模型将完整遍历训练集 60 次。外层循环 for epoch in range(epochs) 控制每一轮训练,每轮开始前调用 model.train() 将模型切换为训练模式,确保 BN 层使用当前 batch 的统计量。接着初始化四个统计变量:running_id_loss 和 running_tri_loss 用于累加两个损失值,correct 和 total 用于计算分类准确率。
内层循环 for images, labels, _, _ in train_loader 逐个取出 batch,每次取出的图像和标签会被移到 GPU/CPU 上准备计算。cls_score, global_feat = model(images) 是前向传播,模型自动返回分类得分和原始特征向量,前者送入交叉熵损失,后者送入三元组损失。在计算损失之前需要调用 optimizer.zero_grad() 清空上一轮 batch 遗留的梯度,避免梯度累积。
接着依次计算交叉熵损失 loss_id、三元组损失 loss_tri,并将两者相加得到联合损失 loss。loss.backward() 反向传播计算当前 batch 的梯度,optimizer.step() 根据梯度更新模型参数,完成一个 batch 的训练。
在每步更新的同时,通过 loss_id.item() 和 loss_tri.item() 将当前 batch 的损失值累加到统计变量中。同时用 cls_score.max(1) 取出预测类别,与真实标签 labels 逐元素比较,累加正确预测数和总样本数,用于计算这一轮 epoch 的分类准确率。
当内层循环遍历完所有 batch 后,一个 epoch 结束。打印出当前轮次的平均分类损失、平均三元组损失和分类准确率。通过观察这些指标随 epoch 的变化趋势,可以判断模型是否正常收敛。重复这个过程 60 轮后,训练完成。
模型测试与评估:val.py
训练完成后,需要在测试集上评估模型的真实性能。测试流程与训练不同:不需要数据增强(只做 resize 和归一化),不需要采样器(顺序遍历即可),也不需要计算梯度。评估的核心是:提取 Query 集和 Gallery 集中所有图像的特征,计算它们之间的距离矩阵,然后根据距离排序得到检索结果,最后用 Rank-1 和 mAP 两个指标衡量模型效果。
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
import numpy as np
import os
from dataset import Market1501
from model import ReIDResNet50
def extract_features(model, dataloader, device):
model.eval()
features, pids, camids = [], [], []
with torch.no_grad():
for images, pid, camid, _ in dataloader:
images = images.to(device)
feat = model(images)
# 对特征进行L2归一化,方便后面直接算矩阵乘法求余弦相似度
feat = torch.nn.functional.normalize(feat, p=2, dim=1)
features.append(feat.cpu())
pids.extend(pid.numpy())
camids.extend(camid.numpy())
return torch.cat(features, 0), np.array(pids), np.array(camids)
def evaluate(q_f, q_pids, q_camids, g_f, g_pids, g_camids):
"""简易版 Market1501 评估逻辑"""
print("计算特征距离矩阵...")
# 归一化特征的矩阵乘法即为余弦相似度,相似度越大距离越近
# 距离矩阵: dist = 1 - 相似度
dist_mat = 1 - torch.mm(q_f, g_f.t()).numpy()
num_q, num_g = dist_mat.shape
indices = np.argsort(dist_mat, axis=1) # 按距离从小到大排序
matches = (g_pids[indices] == q_pids[:, np.newaxis]).astype(np.int32)
all_cmc = []
all_AP = []
for q_idx in range(num_q):
q_pid = q_pids[q_idx]
q_camid = q_camids[q_idx]
# 排除: PID相同且摄像头ID也相同的gallery图片 (这类属于无效匹配不计入成绩)
remove = (g_pids == q_pid) & (g_camids == q_camid)
keep = np.invert(remove)
raw_match = matches[q_idx]
# 只保留合法匹配结果
valid_match = raw_match[keep[indices[q_idx]]]
if not np.any(valid_match): continue
# 计算 CMC 和 AP
cmc = valid_match.cumsum()
cmc[cmc > 1] = 1
all_cmc.append(cmc[:50]) # 只取前50计算Rank指标
num_rel = valid_match.sum()
tmp_cmc = valid_match.cumsum()
tmp_cmc = [x / (i + 1.) for i, x in enumerate(tmp_cmc)]
tmp_cmc = np.asarray(tmp_cmc) * valid_match
AP = tmp_cmc.sum() / num_rel
all_AP.append(AP)
all_cmc = np.asarray(all_cmc).astype(np.float32)
all_cmc = all_cmc.sum(0) / len(all_cmc)
mAP = np.mean(all_AP)
return all_cmc[0], mAP # 返回 Rank-1 和 mAP
def test():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transform_test = transforms.Compose([
transforms.Resize((256, 128)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 准备数据
query_dir = 'data/Market-1501-v15.09.15/query'
gallery_dir = 'data/Market-1501-v15.09.15/bounding_box_test'
query_dataset = Market1501(query_dir, transform=transform_test, is_train=False)
gallery_dataset = Market1501(gallery_dir, transform=transform_test, is_train=False)
query_loader = DataLoader(query_dataset, batch_size=64, shuffle=False, num_workers=4)
gallery_loader = DataLoader(gallery_dataset, batch_size=64, shuffle=False, num_workers=4)
# 加载模型
model = ReIDResNet50(num_classes=751).to(device)
model.load_state_dict(torch.load('resnet50_reid.pth'))
print("提取 Query 特征...")
q_f, q_pids, q_camids = extract_features(model, query_loader, device)
print("提取 Gallery 特征...")
g_f, g_pids, g_camids = extract_features(model, gallery_loader, device)
print("开始评估...")
rank1, mAP = evaluate(q_f, q_pids, q_camids, g_f, g_pids, g_camids)
print(f"Results: Rank-1: {rank1*100:.2f}%, mAP: {mAP*100:.2f}%")
if __name__ == '__main__':
test()
1 加载 Query 和 Gallery 数据集和训练好的模型
query_dir = 'data/Market-1501-v15.09.15/query'
gallery_dir = 'data/Market-1501-v15.09.15/bounding_box_test'
query_dataset = Market1501(query_dir, transform=transform_test, is_train=False)
gallery_dataset = Market1501(gallery_dir, transform=transform_test, is_train=False)
query_loader = DataLoader(query_dataset, batch_size=64, shuffle=False, num_workers=4)
gallery_loader = DataLoader(gallery_dataset, batch_size=64, shuffle=False, num_workers=4)
model = ReIDResNet50(num_classes=751).to(device)
model.load_state_dict(torch.load('resnet50_reid.pth'))
前面的和训练代码类似,在这里不再赘述。
Market1501 的测试集分为两部分:query 文件夹中存放待查询的图像(通常每张图像代表一个待检索的目标),bounding_box_test 文件夹中存放待匹配的图像库(gallery)。is_train=False 的作用在前面 dataset.py 中讲过:测试时不进行 PID 重映射,直接使用原始 PID(因为测试集的人名 ID 与训练集完全不重叠,不需要映射)。shuffle=False 保证提取特征时顺序固定,方便后续与 PID、CamID 对齐。
用和训练时相同的结构创建模型实例,然后加载之前保存的权重文件。num_classes=751 分类器在测试时其实用不到(测试只取特征,不用分类头),但为了保持模型结构一致,仍然传入这个参数。如果训练时保存的是完整模型而不是 state_dict,也可以用 torch.load 直接加载,但用 load_state_dict 更规范。
2 特征提取函数 extract_features
def extract_features(model, dataloader, device):
model.eval()
features, pids, camids = [], [], []
with torch.no_grad():
for images, pid, camid, _ in dataloader:
images = images.to(device)
feat = model(images)
feat = torch.nn.functional.normalize(feat, p=2, dim=1)
features.append(feat.cpu())
pids.extend(pid.numpy())
camids.extend(camid.numpy())
return torch.cat(features, 0), np.array(pids), np.array(camids)
这个函数负责批量提取图像特征。model.eval() 将模型切换为评估模式,这会改变 BN 层的行为:不再使用当前 batch 的均值和方差,而是使用训练时累积的全局统计量,确保每张图像的特征提取结果稳定且可重复。with torch.no_grad() 禁用梯度计算,因为测试时不需要反向传播,这样可以大幅减少内存占用和计算量。
对于每个 batch,将图像传到 GPU/CPU 后调用 model(images)。注意测试时模型返回的是 feat 而不是 (cls_score, global_feat),因为在 model.py 的 forward 函数中,当 self.training=False 时只返回 feat(经过 BN 层归一化后的特征)。然后对特征进行 L2 归一化,使得所有特征的模长为 1。这样做的目的是:两个归一化向量的点积就等于它们的余弦相似度,后续计算距离时直接用矩阵乘法 torch.mm(q_f, g_f.t()) 即可得到相似度矩阵,非常高效。
最后将每个 batch 的特征拼接成一个大张量,PID 和 CamID 转换成 NumPy 数组返回。
3 评估函数 evaluate
def evaluate(q_f, q_pids, q_camids, g_f, g_pids, g_camids):
print("计算特征距离矩阵...")
dist_mat = 1 - torch.mm(q_f, g_f.t()).numpy()
这是整个测试脚本的核心部分。首先我们需要计算距离矩阵:torch.mm(q_f, g_f.t()) 是 Query 特征和 Gallery 特征的余弦相似度矩阵,形状为 (num_query, num_gallery),其中每个元素 [i][j] 表示第 i 个 Query 与第 j 个 Gallery 的相似度(值越大越相似)。因为前面做了 L2 归一化,所以直接用矩阵乘法即可。1 - 相似度 将相似度转换为距离,距离越小表示越相似,方便后续排序。
num_q, num_g = dist_mat.shape
indices = np.argsort(dist_mat, axis=1)
matches = (g_pids[indices] == q_pids[:, np.newaxis]).astype(np.int32)
np.argsort(dist_mat, axis=1) 对每个 Query,按距离从小到大排序,返回排序后的 Gallery 索引。例如第 i 个 Query 对应的 indices[i] 是一个数组,第一个元素是距离最近的 Gallery 的编号,第二个是第二近的,以此类推。g_pids[indices] 按照这个顺序取出每个 Gallery 对应的 PID,然后与 Query 的 PID 比较,得到布尔矩阵 matches,True 表示该位置是正确匹配(即检索结果正确)。
all_cmc = []
all_AP = []
for q_idx in range(num_q):
q_pid = q_pids[q_idx]
q_camid = q_camids[q_idx]
remove = (g_pids == q_pid) & (g_camids == q_camid)
keep = np.invert(remove)
遍历每个 Query,计算它的 CMC 和 AP。remove 是一个布尔数组,标记需要排除的 Gallery 样本:那些 PID 相同且摄像头 ID 也相同的图片。为什么要排除?因为同一个摄像头下同一 PID 的图像可能是同一时刻的连续抓拍(比如视频抽帧),这些图片和 Query 太相似,如果算作正确匹配会虚高指标,Market1501 的标准评估协议要求排除这类样本。keep 取反得到需要保留的样本。
raw_match = matches[q_idx]
valid_match = raw_match[keep[indices[q_idx]]]
if not np.any(valid_match):
continue
indices[q_idx] 是当前 Query 的 Gallery 排序结果,keep[indices[q_idx]] 根据这个顺序筛选出哪些位置是合法样本,再用它去索引 raw_match,得到只包含合法样本的匹配结果 valid_match。如果这个 Query 没有任何合法匹配(理论上不应该发生,但为了安全跳过),则直接进入下一个 Query。
cmc = valid_match.cumsum()
cmc[cmc > 1] = 1
all_cmc.append(cmc[:50])
计算 CMC(Cumulative Matching Characteristic)。cumsum() 是累加,例如 [0,1,0,1,0] 累加后变成 [0,1,1,2,2],然后将大于 1 的值截断为 1,得到 [0,1,1,1,1]。这样 CMC 曲线就表示“在前 k 个检索结果中是否命中了正确目标”。只取前 50 个位置(cmc[:50]),因为 Rank-50 以上的指标通常不太关注。
num_rel = valid_match.sum()
tmp_cmc = valid_match.cumsum()
tmp_cmc = [x / (i + 1.) for i, x in enumerate(tmp_cmc)]
tmp_cmc = np.asarray(tmp_cmc) * valid_match
AP = tmp_cmc.sum() / num_rel
all_AP.append(AP)
计算 AP(Average Precision)。valid_match.cumsum() 得到每个位置上的累加正确数,除以当前位置的序号 (i+1) 得到每个位置上的精确率(Precision)。乘以 valid_match 后,只有正确匹配的位置才保留精确率,错误位置变 0。将这些精确率求和,再除以总正确匹配数,就得到了该 Query 的 AP。AP 衡量的是“检索结果中所有正确目标排得是否靠前”,值越高说明正确结果整体排名越靠前。
all_cmc = np.asarray(all_cmc).astype(np.float32)
all_cmc = all_cmc.sum(0) / len(all_cmc)
mAP = np.mean(all_AP)
return all_cmc[0], mAP
将所有 Query 的 CMC 曲线按列求平均,得到整体的 CMC 曲线。all_cmc[0] 就是 Rank-1 准确率(第一个结果就命中的概率)。对所有 Query 的 AP 求平均得到 mAP(mean Average Precision)。最后返回这两个核心指标。
我们最后的输出结果是:
Results: Rank-1: 86.25%, mAP: 69.27%
这个结果比我预期有些偏低,接下来考虑使用一些Tricks来增强他的识别率。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)