数据增广
处理图像时由于同意图像可能会因为大小,角度不同而在机械的角度看着不同,所以需要数据增广

 

Adam(原始版)

把权重衰减融入「梯度更新」过程(先算梯度,再乘衰减系数)

AdamW(改进版,W=Weight Decay)

把权重衰减和「梯度更新」分离:1. 用 Adam 算梯度并更新;2. 单独对权重做衰减(直接减 λ*w

解决了 Adam 权重衰减失效的问题,衰减效果稳定

迁移学习

自己的模型往往因为数据不够多,而准确率不够,所以这个时候可以调用大佬的模型

# 导入必要的库
import random  # 用于设置随机种子
import torch  # PyTorch核心库,用于张量运算和模型构建
import torch.nn as nn  # PyTorch神经网络模块
import numpy as np  # 数值计算库,处理数组和矩阵
import os  # 操作系统相关功能,用于文件路径处理
from PIL import Image  # Python Imaging Library,用于读取和处理图片
from torch.utils.data import Dataset, DataLoader  # 自定义数据集和数据加载器
from tqdm import tqdm  # 进度条显示库,方便查看数据加载/训练进度
from torchvision import transforms  # 图像预处理工具
import time  # 计时工具,统计训练耗时
import matplotlib.pyplot as plt  # 绘图库,可视化损失和准确率曲线
from model_utils.model import initialize_model  # 自定义模型初始化函数(推测是加载预训练VGG)

# 定义全局随机种子函数,确保实验可复现
def seed_everything(seed):
    torch.manual_seed(seed)  # 设置CPU随机种子
    torch.cuda.manual_seed(seed)  # 设置单个GPU随机种子
    torch.cuda.manual_seed_all(seed)  # 设置所有GPU随机种子
    torch.backends.cudnn.benchmark = False  # 关闭cuDNN自动优化,保证确定性
    torch.backends.cudnn.deterministic = True  # 设置cuDNN确定性算法
    random.seed(seed)  # 设置Python随机种子
    np.random.seed(seed)  # 设置NumPy随机种子
    os.environ['PYTHONHASHSEED'] = str(seed)  # 设置Python哈希种子

# 执行种子设置,固定seed=0,确保每次运行结果一致
seed_everything(0)

# 定义图像统一尺寸(224x224,适配VGG等预训练模型)
HW = 224

# 定义训练集数据增强变换
train_transform = transforms.Compose([
    transforms.ToPILImage(),  # 将numpy数组转换为PIL图像(因为读取的是numpy格式)
    transforms.RandomResizedCrop(224),  # 随机裁剪并缩放到224x224,增加数据多样性
    transforms.RandomRotation(50),  # 随机旋转(0-50度),增强鲁棒性
    transforms.ToTensor()  # 转换为PyTorch张量,格式从(H,W,C)转为(C,H,W),并归一化到[0,1],把像素值归一化到 0-1,核心是为了让神经网络训练更稳定、收敛更快、效果更好
])

# 定义验证集数据变换(仅做必要转换,不做数据增强)
val_transform = transforms.Compose([
    transforms.ToPILImage(),  # numpy转PIL图像
    transforms.ToTensor()  # 转张量
])

# 自定义食品分类数据集类,继承自PyTorch的Dataset
class food_Dataset(Dataset):
    def __init__(self, path, mode="train"):     #mode="train"是默认参数,不是 “固定参数”—— 只有在调用这个类时不指定mode,它才会等于 train;如果指定了,就会用你指定的值
        """
        初始化数据集
        :param path: 数据文件夹路径
        :param mode: 数据集模式:train(有标签训练)/val(有标签验证)/semi(无标签半监督)
        """
        self.mode = mode
        if mode == "semi":  # 半监督模式:只有图像,无标签
            self.X = self.read_file(path)
        else:  # 有标签模式:读取图像和对应标签
            self.X, self.Y = self.read_file(path)
            self.Y = torch.LongTensor(self.Y)  # 将标签转为LongTensor(分类任务标签格式)

        # 根据模式选择数据变换方式
        if mode == "train":
            self.transform = train_transform
        else:
            self.transform = val_transform

    def read_file(self, path):
        """
        读取文件夹中的图像数据
        :param path: 数据路径
        :return: 图像数组(+标签数组,非semi模式)
        """
        if self.mode == "semi":  # 处理无标签数据
            file_list = os.listdir(path)  # 列出文件夹下所有图像文件
            # 初始化存储数组:(样本数, 高度, 宽度, 通道数),uint8节省内存
            xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
            for j, img_name in enumerate(file_list):
                # 遍历所有文件读取图像
                # enumerate(file_list):把列表变成 “索引 + 元素” 的配对,比如:
                # 第 1 次循环:j=0(索引),img_name='img0.jpg'(元素);
                # 第 2 次循环:j=1,img_name='img1.png';
                # 第 3 次循环:j=2,img_name='img2.jpg';
                # 没有enumerate()的话,你只能拿到img_name,但拿不到j(索引),而后续赋值需要j来指定数组位置
                img_path = os.path.join(path, img_name)  # 拼接完整图像路径
                img = Image.open(img_path)  # 打开图像
                img = img.resize((HW, HW))  # 缩放到统一尺寸
                xi[j, ...] = img  # 存入数组,xi[j, ...]取第 j 个样本,保留剩余所有维度(H/W/C)    shape=(224, 224, 3)    ✅ 匹配 img 维度,而xi[j]不会保留维度
            print("读到了%d个无标签数据" % len(xi))
            return xi
        else:  # 处理有标签数据(0-10共11类)
            # 遍历11个类别文件夹(00-10)
            for i in tqdm(range(11), desc="读取有标签数据"):
                file_dir = path + "/%02d" % i  # 拼接类别文件夹路径(00,01,...,10)
                file_list = os.listdir(file_dir)  # 列出该类别下所有图像

                # 初始化当前类别的图像和标签数组
                xi = np.zeros((len(file_list), HW, HW, 3), dtype=np.uint8)
                yi = np.zeros(len(file_list), dtype=np.uint8)  # 标签统一为当前类别i

                # 读取当前类别所有图像
                for j, img_name in enumerate(file_list):
                    img_path = os.path.join(file_dir, img_name)  # 拼接完整图像路径
                    img = Image.open(img_path)  # 打开图像(PIL库)
                    img = img.resize((HW, HW))  # 统一图像尺寸(保证所有图像维度一致)
                    xi[j, ...] = img  # 把图像数据存入xi的第j个位置(...等价于:, :, :)
                    yi[j] = i  # 给第j张图像打标签:当前类别i

                # 合并所有类别的数据
                if i == 0:  # 第一类初始化总数组
                    X = xi
                    Y = yi
                else:  # 后续类别拼接数组(axis=0表示按样本数维度拼接)
                    X = np.concatenate((X, xi), axis=0)
                    Y = np.concatenate((Y, yi), axis=0)
            print("读到了%d个有标签数据" % len(Y))
            return X, Y

    def __getitem__(self, item):
        """
        重写Dataset的核心方法:获取指定索引的样本
        :param item: 样本索引
        :return: 变换后的图像 + 标签(或原始图像,semi模式)
        """
        if self.mode == "semi":
            # 半监督模式返回:变换后的图像(用于预测) + 原始图像(用于后续打标签)
            return self.transform(self.X[item]), self.X[item]
        else:
            # 有标签模式返回:变换后的图像 + 标签
            return self.transform(self.X[item]), self.Y[item]

    def __len__(self):
        """重写Dataset的核心方法:返回数据集总样本数"""
        return len(self.X)

# 半监督数据集类:为无标签数据生成伪标签
class semiDataset(Dataset):
    def __init__(self, no_label_loader, model, device, thres=0.99):
        """
        初始化半监督数据集
        :param no_label_loader: 无标签数据加载器
        :param model: 训练好的模型(用于预测伪标签)
        :param device: 训练设备(cpu/gpu)
        :param thres: 置信度阈值(仅保留置信度>thres的伪标签)
        """
        # 用模型为无标签数据生成伪标签
        x, y = self.get_label(no_label_loader, model, device, thres)
        # 判断是否有符合阈值的伪标签数据
        if len(x) == 0:
            self.flag = False  # 无有效伪标签
        else:
            self.flag = True  # 有有效伪标签
            self.X = np.array(x)  # 伪标签图像数组
            self.Y = torch.LongTensor(y)  # 伪标签(LongTensor格式)
            self.transform = train_transform  # 用训练集的增强变换

    def get_label(self, no_label_loader, model, device, thres):
        """
        为无标签数据生成伪标签
        :return: 符合阈值的图像列表 + 对应伪标签列表
        """
        model = model.to(device)  # 模型移到指定设备
        pred_prob = []  # 存储预测置信度
        labels = []  # 存储预测标签
        x = []  # 存储符合阈值的图像
        y = []  # 存储符合阈值的伪标签
        soft = nn.Softmax(dim=1)  # Softmax层(dim=1:对每个样本的类别维度做归一化)

        # 无梯度计算(仅预测,不训练),节省内存并加速,PyTorch 的梯度追踪是 “宁肯备着不用,也不肯用时没有”,而我们预测时明确知道 “绝对用不上”,所以主动关掉,避免浪费。
        with torch.no_grad():
            for bat_x, _ in no_label_loader:  # 遍历无标签数据批次
                bat_x = bat_x.to(device)  # 数据移到指定设备
                pred = model(bat_x)  # 模型预测(原始logits)
                pred_soft = soft(pred)  # 转为概率分布
                # 获取每个样本的最大概率和对应标签
                pred_max, pred_value = pred_soft.max(1)
                # 转换为numpy并加入列表
                pred_prob.extend(pred_max.cpu().numpy().tolist())
                labels.extend(pred_value.cpu().numpy().tolist())

        # 筛选置信度>阈值的样本
        for index, prob in enumerate(pred_prob):
            if prob > thres:
                # 取原始图像(无增强)用于后续训练
                x.append(no_label_loader.dataset[index][1])
                y.append(labels[index])
        return x, y

    def __getitem__(self, item):
        """获取指定索引的伪标签样本"""
        return self.transform(self.X[item]), self.Y[item]

    def __len__(self):
        """返回伪标签数据集样本数"""
        return len(self.X)

# 生成半监督数据加载器
def get_semi_loader(no_label_loader, model, device, thres):
    """
    创建半监督数据加载器
    :return: 伪标签数据加载器(无有效数据则返回None)
    """
    # 步骤1:实例化自定义的semiDataset类,生成伪标签数据集
    # 核心:semiDataset内部会用model对no_label_loader中的无标签图像做预测,
    # 筛选出置信度≥thres的样本,生成伪标签,并标记flag(是否有有效样本)
    semiset = semiDataset(no_label_loader, model, device, thres)
    # 步骤2:判断是否有有效伪标签样本
    if semiset.flag == False:
        return None
    else:
        # 步骤3:创建伪标签数据的DataLoader
        # batch_size=16:批次大小(可根据显存调整)
        # shuffle=False:伪标签样本无需打乱(半监督中伪标签是“辅助数据”,打乱与否不影响)
        # 构建DataLoader:批次大小16,不打乱(伪标签数据无需打乱)
        semi_loader = DataLoader(semiset, batch_size=16, shuffle=False)
        return semi_loader

# 自定义CNN模型(简化版VGG结构)
class myModel(nn.Module):
    def __init__(self, num_class):
        """
        初始化自定义模型
        :param num_class: 分类类别数(这里是11类食品)
        """
        super(myModel, self).__init__()
        # 卷积层1:3通道输入→64通道输出,3x3卷积核,步长1,填充1(保持尺寸)
        self.conv1 = nn.Conv2d(3, 64, 3, 1, 1)  # 输出:64x224x224
        self.bn1 = nn.BatchNorm2d(64)  # 批量归一化,加速训练
        self.relu = nn.ReLU()  # 激活函数
        self.pool1 = nn.MaxPool2d(2)  # 最大池化,步长2→输出:64x112x112

        # 卷积层组1:64→128通道,池化后→128x56x56
        self.layer1 = nn.Sequential(
            nn.Conv2d(64, 128, 3, 1, 1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        # 卷积层组2:128→256通道,池化后→256x28x28
        self.layer2 = nn.Sequential(
            nn.Conv2d(128, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        # 卷积层组3:256→512通道,池化后→512x14x14
        self.layer3 = nn.Sequential(
            nn.Conv2d(256, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.pool2 = nn.MaxPool2d(2)  # 再次池化→512x7x7
        # 全连接层1:512*7*7=25088 → 1000维
        self.fc1 = nn.Linear(25088, 1000)
        self.relu2 = nn.ReLU()
        # 全连接层2:1000 → 最终分类数
        self.fc2 = nn.Linear(1000, num_class)

    def forward(self, x):
        """
        前向传播
        :param x: 输入张量 (batch_size, 3, 224, 224)
        :return: 分类预测logits
        """
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pool1(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.pool2(x)
        x = x.view(x.size()[0], -1)  # 展平:(batch_size, 512*7*7) → (batch_size, 25088)
        x = self.fc1(x)
        x = self.relu2(x)
        x = self.fc2(x)
        return x

# 训练+验证主函数(含半监督训练)
def train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path):
    """
    模型训练与验证
    :param model: 待训练模型
    :param train_loader: 有标签训练数据加载器
    :param val_loader: 验证数据加载器
    :param no_label_loader: 无标签数据加载器
    :param device: 训练设备
    :param epochs: 训练轮数
    :param optimizer: 优化器
    :param loss: 损失函数
    :param thres: 半监督伪标签置信度阈值
    :param save_path: 最优模型保存路径
    """
    model = model.to(device)  # 模型移到指定设备
    semi_loader = None  # 初始化半监督数据加载器
    # 初始化可视化用的列表
    plt_train_loss = []  # 训练损失
    plt_val_loss = []    # 验证损失
    plt_train_acc = []   # 训练准确率
    plt_val_acc = []     # 验证准确率
    max_acc = 0.0        # 记录最高验证准确率

    # 遍历每一轮训练
    for epoch in range(epochs):
        # 初始化本轮损失和准确率
        train_loss = 0.0
        val_loss = 0.0
        train_acc = 0.0
        val_acc = 0.0
        semi_loss = 0.0
        semi_acc = 0.0

        start_time = time.time()  # 记录本轮开始时间

        # ========== 训练阶段 ==========
        model.train()  # 模型设为训练模式(启用Dropout/BatchNorm训练模式)
        # 训练有标签数据
        for batch_x, batch_y in train_loader:
            x, target = batch_x.to(device), batch_y.to(device)  # 数据移到设备
            pred = model(x)  # 模型预测
            train_bat_loss = loss(pred, target)  # 计算批次损失
            train_bat_loss.backward()  # 反向传播计算梯度
            optimizer.step()  # 更新模型参数
            optimizer.zero_grad()  # 梯度清零(避免累积)
            # 累加损失和准确率
            train_loss += train_bat_loss.cpu().item()  # 转为标量累加
            # 计算批次准确率:预测类别与真实标签匹配的数量
            train_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
        # 记录本轮训练损失和准确率(平均)
        plt_train_loss.append(train_loss / len(train_loader))
        plt_train_acc.append(train_acc / len(train_loader.dataset))

        # 训练半监督伪标签数据(如果有)
        if semi_loader is not None:
            for batch_x, batch_y in semi_loader:
                x, target = batch_x.to(device), batch_y.to(device)
                pred = model(x)
                semi_bat_loss = loss(pred, target)
                semi_bat_loss.backward()
                optimizer.step()
                optimizer.zero_grad()
                semi_loss += semi_bat_loss.cpu().item()  # 修正:原代码误加了train_bat_loss
                semi_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
            # 打印半监督训练准确率(修正:分母改为半监督数据集长度)
            print(f"半监督数据集的训练准确率为 {semi_acc / len(semi_loader.dataset):.4f}")

        # ========== 验证阶段 ==========
        model.eval()  # 模型设为评估模式(关闭Dropout/BatchNorm训练模式)
        with torch.no_grad():  # 无梯度计算,节省内存
            for batch_x, batch_y in val_loader:
                x, target = batch_x.to(device), batch_y.to(device)
                pred = model(x)
                val_bat_loss = loss(pred, target)  # 计算验证批次损失
                val_loss += val_bat_loss.cpu().item()
                # 累加验证准确率
                val_acc += np.sum(np.argmax(pred.detach().cpu().numpy(), axis=1) == target.cpu().numpy())
        # 记录本轮验证损失和准确率
        plt_val_loss.append(val_loss / len(val_loader))
        plt_val_acc.append(val_acc / len(val_loader.dataset))

        # 每3轮且验证准确率>0.6时,更新半监督数据(生成新的伪标签)
        if epoch % 3 == 0 and plt_val_acc[-1] > 0.6:
            semi_loader = get_semi_loader(no_label_loader, model, device, thres)

        # 保存最优模型(验证准确率更高时)
        if val_acc > max_acc:
            torch.save(model, save_path)
            max_acc = val_acc
            print(f"保存最优模型,当前最高验证准确率:{max_acc / len(val_loader.dataset):.4f}")

        # 打印本轮训练结果
        print(
            '[%03d/%03d] %2.2f sec(s) | TrainLoss: %.6f | ValLoss: %.6f | TrainAcc: %.6f | ValAcc: %.6f' %
            (epoch + 1, epochs, time.time() - start_time,
             plt_train_loss[-1], plt_val_loss[-1],
             plt_train_acc[-1], plt_val_acc[-1])
        )

    # ========== 训练完成后可视化 ==========
    # 绘制损失曲线
    plt.plot(plt_train_loss, label="train")
    plt.plot(plt_val_loss, label="val")
    plt.title("Loss Curve")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

    # 绘制准确率曲线
    plt.plot(plt_train_acc, label="train")
    plt.plot(plt_val_acc, label="val")
    plt.title("Accuracy Curve")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.show()

# ===================== 主程序入口 =====================
# 数据路径配置(根据你的本地路径修改)
# train_path = r"F:\pycharm\beike\classification\food_classification\food-11\training\labeled"
# val_path = r"F:\pycharm\beike\classification\food_classification\food-11\validation"
train_path = r"E:\深度学习\第五节_分类代码\food_classification\food-11\training\labeled"
val_path = r"E:\深度学习\第五节_分类代码\food_classification\food-11\validation"
no_label_path = r"E:\深度学习\第五节_分类代码\food_classification\food-11\training\unlabeled\00"

# 构建数据集实例
train_set = food_Dataset(train_path, "train")  # 有标签训练集
val_set = food_Dataset(val_path, "val")        # 有标签验证集
no_label_set = food_Dataset(no_label_path, "semi")  # 无标签半监督集

# 构建数据加载器(批次大小16,训练集打乱,验证/无标签集不打乱)
train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
val_loader = DataLoader(val_set, batch_size=16, shuffle=False)  # 修正:验证集无需打乱
no_label_loader = DataLoader(no_label_set, batch_size=16, shuffle=False)

# 初始化模型(二选一:自定义模型 / 预训练VGG)
# model = myModel(11)  # 自定义模型
model, _ = initialize_model("vgg", 11, use_pretrained=True)  # 预训练VGG模型

# 训练超参数配置
lr = 0.001  # 学习率
loss = nn.CrossEntropyLoss()  # 交叉熵损失(分类任务)
# 优化器:AdamW(带权重衰减的Adam,防止过拟合)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
device = "cuda" if torch.cuda.is_available() else "cpu"  # 自动选择GPU/CPU
save_path = "model_save/best_model.pth"  # 最优模型保存路径
epochs = 15  # 训练轮数
thres = 0.99  # 半监督伪标签置信度阈值

# 创建模型保存目录(避免路径不存在报错)
os.makedirs(os.path.dirname(save_path), exist_ok=True)

# 启动训练和验证
train_val(model, train_loader, val_loader, no_label_loader, device, epochs, optimizer, loss, thres, save_path)
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐