【山东大学软件学院创新项目实训】(六)扩散模型微调故事场景图像生成
虽然我们已经对deepseek进行微调以生成逻辑性和连贯性较强的故事,单纯的故事内容仍然不足以吸引用户,因此我们希望借助图片生成模型Stable Diffusion,根据故事内容,生成符合当前剧情发展的图像。
Stable Diffusion 万字长文详解稳定扩散模型 - 知乎
我们选择开源模型stablediffusionapi/cyberrealistic-41 · Hugging Face进行微调。
调用方式:
import requests
import json
url = "https://stablediffusionapi.com/api/v4/dreambooth"
payload = json.dumps({
"key": "your_api_key",
"model_id": "cyberrealistic-41",
"prompt": "ultra realistic close up portrait ((beautiful pale cyberpunk female with heavy black eyeliner)), blue eyes, shaved side haircut, hyper detail, cinematic lighting, magic neon, dark red city, Canon EOS R3, nikon, f/1.4, ISO 200, 1/160s, 8K, RAW, unedited, symmetrical balance, in-frame, 8K",
"negative_prompt": "painting, extra fingers, mutated hands, poorly drawn hands, poorly drawn face, deformed, ugly, blurry, bad anatomy, bad proportions, extra limbs, cloned face, skinny, glitchy, double torso, extra arms, extra hands, mangled fingers, missing lips, ugly face, distorted face, extra legs, anime",
"width": "512",
"height": "512",
"samples": "1",
"num_inference_steps": "30",
"safety_checker": "no",
"enhance_prompt": "yes",
"seed": None,
"guidance_scale": 7.5,
"multi_lingual": "no",
"panorama": "no",
"self_attention": "no",
"upscale": "no",
"embeddings": "embeddings_model_id",
"lora": "lora_model_id",
"webhook": None,
"track_id": None
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
生成图像:

然而,通过扩散模型生成的图像质量很不稳定,在风格、质量方面存在较大差异,并且对于复杂场景生成的图像很可能出现模糊或完全混乱的情况:
prompt:随着天鹅绒窗帘的颤动,观众的嗡嗡声渐渐消失了。12岁的米娅把橡皮鸡背在背后,手心冒汗。这不仅仅是一场普通的学校才艺表演——这是她的喜剧二人组的首次亮相,整个六年级的学生都挤满了礼堂。她最好的朋友杰克站在她身边,僵得像座雕像。他们的小品《香蕉大盗》(The Banana Bandit)依赖于一个关键道具:一根香蕉会在抢劫场景中“不小心”滑倒,引起观众哄笑。但当米娅透过帘子偷看时,她发现他们的老师柯林斯先生正疯狂地用嘴说:“香蕉在哪里?!”“*
image:

因此,我们希望通过微调让模型生成指定风格的图像,并保持图像的高质量。
我们需要一个风格一致的数据集训练模型。Stable Diffusion训练需要每个图像都带有对应的文本标题。在网上调研后,我们找到了来自 FastGAN 的 宝可梦 数据集,它包含一千张图像,具有高分辨率,并且有非常一致的风格,但缺少对应的文本描述。由于工作量巨大,我们选择使用开源的BLIP模型进行图像标注,尽管模型的标注并不完美,但它们相当准确且足以满足我们的目的。
示例如下:
text:
a drawing of a green pokemon with red eyes
image:

完整数据集包含833张高质量图像以及对应的描述:

微调代码:
导入了操作系统、数学、文件、深度学习、数据处理、图像处理及进度条等相关库,为后续所有功能模块提供基础依赖。
# ========== 标准库模块 ==========
import os
import math
import glob
import shutil
import subprocess
# ========== 第三方库 ==========
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
from tqdm.auto import tqdm
# ========== 深度学习相关库 ==========
from torchvision import transforms
# Transformers (Hugging Face)
from transformers import CLIPTextModel, CLIPTokenizer, CLIPModel, CLIPProcessor
# Diffusers (Hugging Face)
from diffusers import (
AutoencoderKL,
DDPMScheduler,
UNet2DConditionModel,
DiffusionPipeline
)
from diffusers.optimization import get_scheduler
from diffusers.training_utils import compute_snr
# ========== LoRA 模型库 ==========
from peft import LoraConfig, get_peft_model, PeftModel
# ========== 面部检测库 ==========
from deepface import DeepFace
import cv2
from datasets import load_dataset
定义 PyTorch 数据集类,将图片与文本分别处理成模型可用的张量和token。
为了增强效果并扩充数据集,我们对数据集使用数据增强方法:使用transforms对图片调整大小、裁剪以及随机翻转。
IMAGE_EXTENSIONS = [".png", ".jpg", ".jpeg", ".webp", ".bmp", ".PNG", ".JPG", ".JPEG", ".WEBP", ".BMP"]
class Text2ImageDataset(torch.utils.data.Dataset):
"""
用于构建文本到图像模型的微调数据集
"""
def __init__(self, ds, transform, tokenizer, text_column="zh"):
self.ds = ds
self.transform = transform
self.tokenizer = tokenizer
self.text_column = text_column
def __getitem__(self, idx):
item = self.ds[idx]
image = item["image"]
caption = item[self.text_column]
image = image.convert("RGB")
tensor = self.transform(image)
inputs = self.tokenizer(
caption,
max_length=self.tokenizer.model_max_length,
padding="max_length",
truncation=True,
return_tensors="pt"
)
input_id = inputs.input_ids[0]
return tensor, input_id
def __len__(self):
return len(self.ds)
# 训练图像的分辨率
resolution = 512
# 数据增强操作
train_transform = transforms.Compose([
transforms.Resize(resolution, interpolation=transforms.InterpolationMode.BILINEAR),
transforms.CenterCrop(resolution),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
])
加载Stable Diffusion全套模型(VAE, UNet, text_encoder等),并根据需求应用LoRA/合并权重或从断点恢复。
def prepare_lora_model(lora_config, pretrained_model_name_or_path, model_path=None, resume=False, merge_lora=False):
"""
加载完整的 Stable Diffusion 模型,并根据需要应用/合并 LoRA 权重。
"""
noise_scheduler = DDPMScheduler.from_pretrained(pretrained_model_name_or_path, subfolder="scheduler")
tokenizer = CLIPTokenizer.from_pretrained(pretrained_model_name_or_path, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(pretrained_model_name_or_path, torch_dtype=weight_dtype, subfolder="text_encoder")
vae = AutoencoderKL.from_pretrained(pretrained_model_name_or_path, subfolder="vae")
unet = UNet2DConditionModel.from_pretrained(pretrained_model_name_or_path, torch_dtype=weight_dtype, subfolder="unet")
if resume:
if model_path is None or not os.path.exists(model_path):
raise ValueError("当 resume 设置为 True 时,必须提供有效的 model_path")
text_encoder = PeftModel.from_pretrained(text_encoder, os.path.join(model_path, "text_encoder"))
unet = PeftModel.from_pretrained(unet, os.path.join(model_path, "unet"))
for param in unet.parameters():
if param.requires_grad is False:
param.requires_grad = True
for param in text_encoder.parameters():
if param.requires_grad is False:
param.requires_grad = True
print(f"✅ 已从 {model_path} 恢复模型权重")
else:
text_encoder = get_peft_model(text_encoder, lora_config)
unet = get_peft_model(unet, lora_config)
print("📊 Text Encoder 可训练参数:")
text_encoder.print_trainable_parameters()
print("📊 UNet 可训练参数:")
unet.print_trainable_parameters()
if merge_lora:
text_encoder = text_encoder.merge_and_unload()
unet = unet.merge_and_unload()
text_encoder.eval()
unet.eval()
vae.requires_grad_(False)
unet.to(DEVICE, dtype=weight_dtype)
vae.to(DEVICE, dtype=weight_dtype)
text_encoder.to(DEVICE, dtype=weight_dtype)
return tokenizer, noise_scheduler, unet, vae, text_encoder
为UNet和Text Encoder设置优化器,且指定不同学习率。
def prepare_lora_model(lora_config, pretrained_model_name_or_path, model_path=None, resume=False, merge_lora=False):
"""
加载完整的 Stable Diffusion 模型,并根据需要应用/合并 LoRA 权重。
"""
noise_scheduler = DDPMScheduler.from_pretrained(pretrained_model_name_or_path, subfolder="scheduler")
tokenizer = CLIPTokenizer.from_pretrained(pretrained_model_name_or_path, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(pretrained_model_name_or_path, torch_dtype=weight_dtype, subfolder="text_encoder")
vae = AutoencoderKL.from_pretrained(pretrained_model_name_or_path, subfolder="vae")
unet = UNet2DConditionModel.from_pretrained(pretrained_model_name_or_path, torch_dtype=weight_dtype, subfolder="unet")
if resume:
if model_path is None or not os.path.exists(model_path):
raise ValueError("当 resume 设置为 True 时,必须提供有效的 model_path")
text_encoder = PeftModel.from_pretrained(text_encoder, os.path.join(model_path, "text_encoder"))
unet = PeftModel.from_pretrained(unet, os.path.join(model_path, "unet"))
for param in unet.parameters():
if param.requires_grad is False:
param.requires_grad = True
for param in text_encoder.parameters():
if param.requires_grad is False:
param.requires_grad = True
print(f"✅ 已从 {model_path} 恢复模型权重")
else:
text_encoder = get_peft_model(text_encoder, lora_config)
unet = get_peft_model(unet, lora_config)
print("📊 Text Encoder 可训练参数:")
text_encoder.print_trainable_parameters()
print("📊 UNet 可训练参数:")
unet.print_trainable_parameters()
if merge_lora:
text_encoder = text_encoder.merge_and_unload()
unet = unet.merge_and_unload()
text_encoder.eval()
unet.eval()
vae.requires_grad_(False)
unet.to(DEVICE, dtype=weight_dtype)
vae.to(DEVICE, dtype=weight_dtype)
text_encoder.to(DEVICE, dtype=weight_dtype)
return tokenizer, noise_scheduler, unet, vae, text_encoder
设置训练相关参数(batch size、数据类型、随机种子、学习率等),以及训练用的设备。
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"🖥 当前使用的设备: {DEVICE}")
train_batch_size = 2
weight_dtype = torch.bfloat16
snr_gamma = 5
seed = 1126
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
unet_learning_rate = 1e-4
text_encoder_learning_rate = 1e-4
lr_scheduler_name = "cosine_with_restarts"
lr_warmup_steps = 100
max_train_steps = 2000
num_cycles = 3
pretrained_model_name_or_path = "stablediffusionapi/cyberrealistic-41"
lora_config = LoraConfig(
r=32,
lora_alpha=16,
target_modules=[
"q_proj", "v_proj", "k_proj", "out_proj",
"to_k", "to_q", "to_v", "to_out.0"
],
lora_dropout=0
)
初始化 tokenizer、数据集、dataloader,准备模型、优化器和学习率调度器。
tokenizer = CLIPTokenizer.from_pretrained(pretrained_model_name_or_path, subfolder="tokenizer")
train_dataset = Text2ImageDataset(
ds=ds["train"],
transform=train_transform,
tokenizer=tokenizer,
text_column="zh_text"
)
train_dataloader = torch.utils.data.DataLoader(
train_dataset,
shuffle=True,
collate_fn=collate_fn,
batch_size=train_batch_size,
num_workers=8,
)
print("✅ 数据集准备完成!")
model_path = None
output_folder = "./output"
os.makedirs(output_folder, exist_ok=True)
tokenizer, noise_scheduler, unet, vae, text_encoder = prepare_lora_model(
lora_config,
pretrained_model_name_or_path,
model_path,
resume=False,
merge_lora=False
)
optimizer = prepare_optimizer(
unet,
text_encoder,
unet_learning_rate=unet_learning_rate,
text_encoder_learning_rate=text_encoder_learning_rate
)
lr_scheduler = get_scheduler(
lr_scheduler_name,
optimizer=optimizer,
num_warmup_steps=lr_warmup_steps,
num_training_steps=max_train_steps,
num_cycles=num_cycles
)
print("✅ 模型和优化器准备完成!可以开始训练。")
核心训练流程,包括:
- 编码图片为潜在空间
- 添加噪声
- 文本编码
- UNet 预测
- SNR加权损失计算
- 反向传播与优化
- 定期保存模型
os.environ["TOKENIZERS_PARALLELISM"] = "false"
global_step = 0
best_face_score = float("inf")
progress_bar = tqdm(range(max_train_steps), desc="训练步骤")
for epoch in range(math.ceil(max_train_steps / len(train_dataloader))):
unet.train()
text_encoder.train()
for step, batch in enumerate(train_dataloader):
if global_step >= max_train_steps:
break
latents = vae.encode(batch["pixel_values"].to(DEVICE, dtype=weight_dtype)).latent_dist.sample()
latents = latents * vae.config.scaling_factor
noise = torch.randn_like(latents)
timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (latents.shape[0],), device=DEVICE).long()
noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
encoder_hidden_states = text_encoder(batch["input_ids"].to(DEVICE))[0]
if noise_scheduler.config.prediction_type == "epsilon":
target = noise
elif noise_scheduler.config.prediction_type == "v_prediction":
target = noise_scheduler.get_velocity(latents, noise, timesteps)
model_pred = unet(noisy_latents, timesteps, encoder_hidden_states)[0]
if not snr_gamma:
loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean")
else:
snr = compute_snr(noise_scheduler, timesteps)
mse_loss_weights = torch.stack([snr, snr_gamma * torch.ones_like(timesteps)], dim=1).min(dim=1)[0]
if noise_scheduler.config.prediction_type == "epsilon":
mse_loss_weights = mse_loss_weights / snr
elif noise_scheduler.config.prediction_type == "v_prediction":
mse_loss_weights = mse_loss_weights / (snr + 1)
loss = F.mse_loss(model_pred.float(), target.float(), reduction="none")
loss = loss.mean(dim=list(range(1, len(loss.shape)))) * mse_loss_weights
loss = loss.mean()
loss.backward()
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
progress_bar.update(1)
global_step += 1
if global_step % 100 == 0 or global_step == max_train_steps:
print(f"🔥 步骤 {global_step}, 损失: {loss.item()}")
if global_step % 500 == 0:
save_path = os.path.join(output_folder, f"checkpoint-{global_step}")
os.makedirs(save_path, exist_ok=True)
unet.save_pretrained(os.path.join(save_path, "unet"))
text_encoder.save_pretrained(os.path.join(save_path, "text_encoder"))
print(f"💾 已保存中间模型到 {save_path}")
save_path = os.path.join(output_folder, "checkpoint-last")
os.makedirs(save_path, exist_ok=True)
unet.save_pretrained(os.path.join(save_path, "unet"))
text_encoder.save_pretrained(os.path.join(save_path, "text_encoder"))
print(f"💾 已保存最终模型到 {save_path}")
print("🎉 微调完成!")
我们手动指定的参数包括:
| 超参数名称 | 设定值/说明 |
|---|---|
| 图像分辨率 | 512 |
| batch size | 2 |
| 随机种子 | 1126 |
| 权重数据类型 | bfloat16 |
| LoRA秩(r) | 32 |
| LoRA缩放系数(alpha) | 16 |
| LoRA目标模块 | q_proj, v_proj, k_proj, out_proj, ... |
| LoRA Dropout | 0 |
| UNet学习率 | 1e-4 |
| 文本编码器学习率 | 1e-4 |
| 学习率调度器 | cosine_with_restarts |
| 学习率预热步数 | 100 |
| 总训练步数 | 2000 |
| 调度重启次数(num_cycles) | 3 |
| SNR损失参数(snr_gamma) | 5 |
| DataLoader线程数 | 8 |
| 检查点保存间隔 | 500步 |
| 日志打印间隔 | 100步 |
我们写了一段测试代码来发送prompt并获取生成图片:
import os
import torch
from diffusers import DiffusionPipeline
from peft import PeftModel
from PIL import Image
# 基础模型与 LoRA 路径配置
pretrained_model_name_or_path = "stablediffusionapi/cyberrealistic-41" # 请确保此模型为 diffusers 格式
finetuned_checkpoint_path = "./pokemon/output/checkpoint-2000" # LoRA 权重目录
output_dir = "./gen_images" # 输出图片目录
os.makedirs(output_dir, exist_ok=True)
prompt = "一只可爱的皮卡丘在森林里玩耍,阳光明媚,画风可爱" # 你的自定义 prompt
num_images = 1 # 生成图片数量
num_inference_steps = 50 # 推理步数
guidance_scale = 7.5 # 文本指导比例
seed = 42 # 随机种子
# 1. 加载基础模型
print("🔄 正在加载预训练模型...")
pipeline = DiffusionPipeline.from_pretrained(pretrained_model_name_or_path, torch_dtype=torch.float16)
pipeline.to("cuda")
# 2. 加载 LoRA 权重(diffusers原生方法,指定unet子目录)
pipeline.load_lora_weights(os.path.join(finetuned_checkpoint_path, "unet"), local_files_only=True)
print("✅ LoRA 权重已合并完成!(diffusers原生,仅UNet)")
# 3. 固定随机种子
generator = torch.Generator("cuda").manual_seed(seed)
# 4. 生成与保存图片
for idx in range(num_images):
print(f"🎨 正在生成第 {idx+1} 张图片 ...")
result = pipeline(prompt, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, generator=generator)
image = result.images[0]
save_path = os.path.join(output_dir, f"gen_{idx+1}.png")
image.save(save_path)
print(f"✅ 已保存: {save_path}")
print("🎉 所有图片生成并保存完成!")
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)