半监督图像分类
·
数据增广
处理图像时由于同意图像可能会因为大小,角度不同而在机械的角度看着不同,所以需要数据增广
Adam(原始版)
把权重衰减融入「梯度更新」过程(先算梯度,再乘衰减系数)
AdamW(改进版,W=Weight Decay)
把权重衰减和「梯度更新」分离:1. 用 Adam 算梯度并更新;2. 单独对权重做衰减(直接减 λ*w)
解决了 Adam 权重衰减失效的问题,衰减效果稳定
迁移学习
自己的模型往往因为数据不够多,而准确率不够,所以这个时候可以调用大佬的模型


# 导入必要的库
import random # 用于设置随机种子
import torch # PyTorch核心库,用于张量运算和模型构建
import torch.nn as nn # PyTorch神经网络模块
import numpy as np # 数值计算库,处理数组和矩阵
import os # 操作系统相关功能,用于文件路径处理
from PIL import Image # Python Imaging Library,用于读取和处理图片
from torch.utils.data import Dataset, DataLoader # 自定义数据集和数据加载器
from tqdm import tqdm # 进度条显示库,方便查看数据加载/训练进度
from torchvision import transforms # 图像预处理工具
import time # 计时工具,统计训练耗时
import matplotlib.pyplot as plt # 绘图库,可视化损失和准确率曲线
from model_utils.model import initialize_model # 自定义模型初始化函数(推测是加载预训练VGG)
# 定义全局随机种子函数,确保实验可复现
def seed_everything(seed):
torch.manual_seed(seed) # 设置CPU随机种子
torch.cuda.manual_seed(seed) # 设置单个GPU随机种子
torch.cuda.manual_seed_all(seed) # 设置所有GPU随机种子
torch.backends.cudnn.benchmark = False # 关闭cuDNN自动优化,保证确定性
torch.backends.cudnn.deterministic = True # 设置cuDNN确定性算法
random.seed(seed) # 设置Python随机种子
np.random.seed(seed) # 设置NumPy随机种子
os.environ['PYTHONHASHSEED'] = str(seed) # 设置Python哈希种子
# 执行种子设置,固定seed=0,确保每次运行结果一致
seed_everything(0)
# 定义图像统一尺寸(224x224,适配VGG等预训练模型)
HW = 224
# 定义训练集数据增强变换
train_transform = transforms.Compose([
transforms.ToPILImage(), # 将numpy数组转换为PIL图像(因为读取的是numpy格式)
transforms.RandomResizedCrop(224), # 随机裁剪并缩放到224x224,增加数据多样性
transforms.RandomRotation(50), # 随机旋转(0-50度),增强鲁棒性
transforms.ToTensor() # 转换为PyTorch张量,格式从(H,W,C)转为(C,H,W),并归一化到[0,1],把像素值归一化到 0-1,核心是为了让神经网络训练更稳定、收敛更快、效果更好
])
# 定义验证集数据变换(仅做必要转换,不做数据增强)
val_transform = transforms.Compose([
transforms.ToPILImage(), # numpy转PIL图像
transforms.ToTensor() # 转张量
])
# 自定义食品分类数据集类,继承自PyTorch的Dataset
class food_Dataset(Dataset):
def __init__(self, path, mode="train"): #mode="train"是默认参数,不是 “固定参数”—— 只有在调用这个类时不指定mode,它才会等于 train;如果指定了,就会用你指定的值
"""
初始化数据集
:param path: 数据文件夹路径
:param mode: 数据集模式:train(有标签训练)/val(有标签验证)/semi(无标签半监督)
"""
self.mode = mode
if mode == "semi": # 半监督模式:只有图像,无标签
self.X = self.read_file(path)
else: # 有标签模式:读取图像和对应标签
self.X, self.Y = self.read_file(path)
self.Y = torch.LongTensor(self.Y) # 将标签转为LongTensor(分类任务标签格式)
# 根据模式选择数据变换方式
if mode == "train":
self.transform = train_transform
else:
self.transform = val_transform
def read_file(self, path):
"""
读取文件夹中的图像数据
:param path: 数据路径
:return: 图像数组(+标签数组,非semi模式)
"""
if self.mode == "semi": # 处理无标签数据
file_list = os.listdir(path) # 列出文件夹下所有图像文件
# 初始化存储数组:(样本数, 高度, 宽度, 通道数),uint8节省内存
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
for j, img_name in enumerate(file_list):
# 遍历所有文件读取图像
# enumerate(file_list):把列表变成 “索引 + 元素” 的配对,比如:
# 第 1 次循环:j=0(索引),img_name='img0.jpg'(元素);
# 第 2 次循环:j=1,img_name='img1.png';
# 第 3 次循环:j=2,img_name='img2.jpg';
# 没有enumerate()的话,你只能拿到img_name,但拿不到j(索引),而后续赋值需要j来指定数组位置
img_path = os.path.join(path, img_name) # 拼接完整图像路径
img = Image.open(img_path) # 打开图像
img = img.resize((HW, HW)) # 缩放到统一尺寸
xi[j, ...] = img # 存入数组,xi[j, ...]取第 j 个样本,保留剩余所有维度(H/W/C) shape=(224, 224, 3) ✅ 匹配 img 维度,而xi[j]不会保留维度
print("读到了%d个无标签数据" % len(xi))
return xi
else: # 处理有标签数据(0-10共11类)
# 遍历11个类别文件夹(00-10)
for i in tqdm(range(11), desc="读取有标签数据"):
file_dir = path + "/%02d" % i # 拼接类别文件夹路径(00,01,...,10)
file_list = os.listdir(file_dir) # 列出该类别下所有图像
# 初始化当前类别的图像和标签数组
xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
yi = np.zeros(len(file_list), dtype=np.uint8) # 标签统一为当前类别i
# 读取当前类别所有图像
for j, img_name in enumerate(file_list):
img_path = os.path.join(file_dir, img_name) # 拼接完整图像路径
img = Image.open(img_path) # 打开图像(PIL库)
img = img.resize((HW, HW)) # 统一图像尺寸(保证所有图像维度一致)
xi[j, ...] = img # 把图像数据存入xi的第j个位置(...等价于:, :, :)
yi[j] = i # 给第j张图像打标签:当前类别i
# 合并所有类别的数据
if i == 0: # 第一类初始化总数组
X = xi
Y = yi
else: # 后续类别拼接数组(axis=0表示按样本数维度拼接)
X = np.concatenate((X, xi), axis=0)
Y = np.concatenate((Y, yi), axis=0)
print("读到了%d个有标签数据" % len(Y))
return X, Y
def __getitem__(self, item):
"""
重写Dataset的核心方法:获取指定索引的样本
:param item: 样本索引
:return: 变换后的图像 + 标签(或原始图像,semi模式)
"""
if self.mode == "semi":
# 半监督模式返回:变换后的图像(用于预测) + 原始图像(用于后续打标签)
return self.transform(self.X[item]), self.X[item]
else:
# 有标签模式返回:变换后的图像 + 标签
return self.transform(self.X[item]), self.Y[item]
def __len__(self):
"""重写Dataset的核心方法:返回数据集总样本数"""
return len(self.X)
# 半监督数据集类:为无标签数据生成伪标签
class semiDataset(Dataset):
def __init__(self, no_label_loader, model, device, thres=0.99):
"""
初始化半监督数据集
:param no_label_loader: 无标签数据加载器
:param model: 训练好的模型(用于预测伪标签)
:param device: 训练设备(cpu/gpu)
:param thres: 置信度阈值(仅保留置信度>thres的伪标签)
"""
# 用模型为无标签数据生成伪标签
x, y = self.get_label(no_label_loader, model, device, thres)
# 判断是否有符合阈值的伪标签数据
if len(x) == 0:
self.flag = False # 无有效伪标签
else:
self.flag = True # 有有效伪标签
self.X = np.array(x) # 伪标签图像数组
self.Y = torch.LongTensor(y) # 伪标签(LongTensor格式)
self.transform = train_transform # 用训练集的增强变换
def get_label(self, no_label_loader, model, device, thres):
"""
为无标签数据生成伪标签
:return: 符合阈值的图像列表 + 对应伪标签列表
"""
model = model.to(device) # 模型移到指定设备
pred_prob = [] # 存储预测置信度
labels = [] # 存储预测标签
x = [] # 存储符合阈值的图像
y = [] # 存储符合阈值的伪标签
soft = nn.Softmax(dim=1) # Softmax层(dim=1:对每个样本的类别维度做归一化)
# 无梯度计算(仅预测,不训练),节省内存并加速,PyTorch 的梯度追踪是 “宁肯备着不用,也不肯用时没有”,而我们预测时明确知道 “绝对用不上”,所以主动关掉,避免浪费。
with torch.no_grad():
for bat_x, _ in no_label_loader: # 遍历无标签数据批次
bat_x = bat_x.to(device) # 数据移到指定设备
pred = model(bat_x) # 模型预测(原始logits)
pred_soft = soft(pred) # 转为概率分布
# 获取每个样本的最大概率和对应标签
pred_max, pred_value = pred_soft.max(1)
# 转换为numpy并加入列表
pred_prob.extend(pred_max.cpu().numpy().tolist())
labels.extend(pred_value.cpu().numpy().tolist())
# 筛选置信度>阈值的样本
for index, prob in enumerate(pred_prob):
if prob > thres:
# 取原始图像(无增强)用于后续训练
x.append(no_label_loader.dataset[index][1])
y.append(labels[index])
return x, y
def __getitem__(self, item):
"""获取指定索引的伪标签样本"""
return self.transform(self.X[item]), self.Y[item]
def __len__(self):
"""返回伪标签数据集样本数"""
return len(self.X)
# 生成半监督数据加载器
def get_semi_loader(no_label_loader, model, device, thres):
"""
创建半监督数据加载器
:return: 伪标签数据加载器(无有效数据则返回None)
"""
# 步骤1:实例化自定义的semiDataset类,生成伪标签数据集
# 核心:semiDataset内部会用model对no_label_loader中的无标签图像做预测,
# 筛选出置信度≥thres的样本,生成伪标签,并标记flag(是否有有效样本)
semiset = semiDataset(no_label_loader, model, device, thres)
# 步骤2:判断是否有有效伪标签样本
if semiset.flag == False:
return None
else:
# 步骤3:创建伪标签数据的DataLoader
# batch_size=16:批次大小(可根据显存调整)
# shuffle=False:伪标签样本无需打乱(半监督中伪标签是“辅助数据”,打乱与否不影响)
# 构建DataLoader:批次大小16,不打乱(伪标签数据无需打乱)
semi_loader = DataLoader(semiset, batch_size=16, shuffle=False)
return semi_loader
# 自定义CNN模型(简化版VGG结构)
class myModel(nn.Module):
def __init__(self, num_class):
"""
初始化自定义模型
:param num_class: 分类类别数(这里是11类食品)
"""
super(myModel, self).__init__()
# 卷积层1:3通道输入→64通道输出,3x3卷积核,步长1,填充1(保持尺寸)
self.conv1 = nn.Conv2d(3, 64, 3, 1, 1) # 输出:64x224x224
self.bn1 = nn.BatchNorm2d(64) # 批量归一化,加速训练
self.relu = nn.ReLU() # 激活函数
self.pool1 = nn.MaxPool2d(2) # 最大池化,步长2→输出:64x112x112
# 卷积层组1:64→128通道,池化后→128x56x56
self.layer1 = nn.Sequential(
nn.Conv2d(64, 128, 3, 1, 1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(2)
)
# 卷积层组2:128→256通道,池化后→256x28x28
self.layer2 = nn.Sequential(
nn.Conv2d(128, 256, 3, 1, 1),
nn.BatchNorm2d(256),
nn.ReLU(),
nn.MaxPool2d(2)
)
# 卷积层组3:256→512通道,池化后→512x14x14
self.layer3 = nn.Sequential(
nn.Conv2d(256, 512, 3, 1, 1),
nn.BatchNorm2d(512),
nn.ReLU(),
nn.MaxPool2d(2)
)
self.pool2 = nn.MaxPool2d(2) # 再次池化→512x7x7
# 全连接层1:512*7*7=25088 → 1000维
self.fc1 = nn.Linear(25088, 1000)
self.relu2 = nn.ReLU()
# 全连接层2:1000 → 最终分类数
self.fc2 = nn.Linear(1000, num_class)
def forward(self, x):
"""
前向传播
:param x: 输入张量 (batch_size, 3, 224, 224)
:return: 分类预测logits
"""
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.pool1(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.pool2(x)
x = x.view(x.size()[0], -1) # 展平:(batch_size, 512*7*7) → (batch_size, 25088)
x = self.fc1(x)
x = self.relu2(x)
x = self.fc2(x)
return x
# 训练+验证主函数(含半监督训练)
def train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path):
"""
模型训练与验证
:param model: 待训练模型
:param train_loader: 有标签训练数据加载器
:param val_loader: 验证数据加载器
:param no_label_loader: 无标签数据加载器
:param device: 训练设备
:param epochs: 训练轮数
:param optimizer: 优化器
:param loss: 损失函数
:param thres: 半监督伪标签置信度阈值
:param save_path: 最优模型保存路径
"""
model = model.to(device) # 模型移到指定设备
semi_loader = None # 初始化半监督数据加载器
# 初始化可视化用的列表
plt_train_loss = [] # 训练损失
plt_val_loss = [] # 验证损失
plt_train_acc = [] # 训练准确率
plt_val_acc = [] # 验证准确率
max_acc = 0.0 # 记录最高验证准确率
# 遍历每一轮训练
for epoch in range(epochs):
# 初始化本轮损失和准确率
train_loss = 0.0
val_loss = 0.0
train_acc = 0.0
val_acc = 0.0
semi_loss = 0.0
semi_acc = 0.0
start_time = time.time() # 记录本轮开始时间
# ========== 训练阶段 ==========
model.train() # 模型设为训练模式(启用Dropout/BatchNorm训练模式)
# 训练有标签数据
for batch_x, batch_y in train_loader:
x, target = batch_x.to(device), batch_y.to(device) # 数据移到设备
pred = model(x) # 模型预测
train_bat_loss = loss(pred, target) # 计算批次损失
train_bat_loss.backward() # 反向传播计算梯度
optimizer.step() # 更新模型参数
optimizer.zero_grad() # 梯度清零(避免累积)
# 累加损失和准确率
train_loss += train_bat_loss.cpu().item() # 转为标量累加
# 计算批次准确率:预测类别与真实标签匹配的数量
train_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
# 记录本轮训练损失和准确率(平均)
plt_train_loss.append(train_loss / len(train_loader))
plt_train_acc.append(train_acc / len(train_loader.dataset))
# 训练半监督伪标签数据(如果有)
if semi_loader is not None:
for batch_x, batch_y in semi_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
semi_bat_loss = loss(pred, target)
semi_bat_loss.backward()
optimizer.step()
optimizer.zero_grad()
semi_loss += semi_bat_loss.cpu().item() # 修正:原代码误加了train_bat_loss
semi_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
# 打印半监督训练准确率(修正:分母改为半监督数据集长度)
print(f"半监督数据集的训练准确率为 {semi_acc / len(semi_loader.dataset):.4f}")
# ========== 验证阶段 ==========
model.eval() # 模型设为评估模式(关闭Dropout/BatchNorm训练模式)
with torch.no_grad(): # 无梯度计算,节省内存
for batch_x, batch_y in val_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
val_bat_loss = loss(pred, target) # 计算验证批次损失
val_loss += val_bat_loss.cpu().item()
# 累加验证准确率
val_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
# 记录本轮验证损失和准确率
plt_val_loss.append(val_loss / len(val_loader))
plt_val_acc.append(val_acc / len(val_loader.dataset))
# 每3轮且验证准确率>0.6时,更新半监督数据(生成新的伪标签)
if epoch % 3 == 0 and plt_val_acc[-1] > 0.6:
semi_loader = get_semi_loader(no_label_loader, model, device, thres)
# 保存最优模型(验证准确率更高时)
if val_acc > max_acc:
torch.save(model, save_path)
max_acc = val_acc
print(f"保存最优模型,当前最高验证准确率:{max_acc / len(val_loader.dataset):.4f}")
# 打印本轮训练结果
print(
'[%03d/%03d] %2.2f sec(s) | TrainLoss: %.6f | ValLoss: %.6f | TrainAcc: %.6f | ValAcc: %.6f' %
(epoch + 1, epochs, time.time() - start_time,
plt_train_loss[-1], plt_val_loss[-1],
plt_train_acc[-1], plt_val_acc[-1])
)
# ========== 训练完成后可视化 ==========
# 绘制损失曲线
plt.plot(plt_train_loss, label="train")
plt.plot(plt_val_loss, label="val")
plt.title("Loss Curve")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()
# 绘制准确率曲线
plt.plot(plt_train_acc, label="train")
plt.plot(plt_val_acc, label="val")
plt.title("Accuracy Curve")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()
# ===================== 主程序入口 =====================
# 数据路径配置(根据你的本地路径修改)
# train_path = r"F:\pycharm\beike\classification\food_classification\food-11\training\labeled"
# val_path = r"F:\pycharm\beike\classification\food_classification\food-11\validation"
train_path = r"E:\深度学习\第五节_分类代码\food_classification\food-11\training\labeled"
val_path = r"E:\深度学习\第五节_分类代码\food_classification\food-11\validation"
no_label_path = r"E:\深度学习\第五节_分类代码\food_classification\food-11\training\unlabeled\00"
# 构建数据集实例
train_set = food_Dataset(train_path, "train") # 有标签训练集
val_set = food_Dataset(val_path, "val") # 有标签验证集
no_label_set = food_Dataset(no_label_path, "semi") # 无标签半监督集
# 构建数据加载器(批次大小16,训练集打乱,验证/无标签集不打乱)
train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
val_loader = DataLoader(val_set, batch_size=16, shuffle=False) # 修正:验证集无需打乱
no_label_loader = DataLoader(no_label_set, batch_size=16, shuffle=False)
# 初始化模型(二选一:自定义模型 / 预训练VGG)
# model = myModel(11) # 自定义模型
model, _ = initialize_model("vgg", 11, use_pretrained=True) # 预训练VGG模型
# 训练超参数配置
lr = 0.001 # 学习率
loss = nn.CrossEntropyLoss() # 交叉熵损失(分类任务)
# 优化器:AdamW(带权重衰减的Adam,防止过拟合)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
device = "cuda" if torch.cuda.is_available() else "cpu" # 自动选择GPU/CPU
save_path = "model_save/best_model.pth" # 最优模型保存路径
epochs = 15 # 训练轮数
thres = 0.99 # 半监督伪标签置信度阈值
# 创建模型保存目录(避免路径不存在报错)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# 启动训练和验证
train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)