根据Tensorflow图像分割教程https://www.tensorflow.org/tutorials/images/segmentation按步骤敲代码,可以很方便的生成一个unet范例,其中会自动从网上下载模型、样本。启动训练,得到模型,从网上下载宠物图片,进行识别,效果很不错。

  查看代码发现样本是经过tensorflow_datasets封装的,如何加载我们自己标注的图像进行训练识别呢?一种思路是抽取tensorflow_datasets中的代码,修改后重用。经过跟踪分析,发现内部代码经过重重封装,不容易提取。最直接的手段就是自己从磁盘中加载图像,组织DataSet,给模型进行训练。

  首先,在官方范例下载的过程中,提示了样本保存路径,打开目录,如下图:

在extracted目录下,可以找到元素的图像和标注png图像。将这两部分图像拷贝到自己项目中:

ann为标注图像目录,images为原始图像目录。

数据准备完毕,下面上代码,加载图像,组织训练集、验证集Dataset,训练图像:

import tensorflow as tf
from tensorflow_examples.models.pix2pix import pix2pix

import numpy as np
import os

from PIL import Image

from IPython.display import clear_output
import matplotlib.pyplot as plt

#从磁盘上加载原始图片
def load_tensor_from_file(img_file):
    img = Image.open(img_file)
    sample_image = np.array(img)
    #样本图片规格不一致,需要做通道转换,否则抛异常
    if len(sample_image.shape) != 3 or sample_image.shape[2] == 4:
        img = img.convert("RGB")
        sample_image = np.array(img)
    sample_image = tf.image.resize(sample_image,[128, 128])
    return sample_image

#从磁盘上加载标记图片
def load_ann_from_file(img_file):
    sample_image = tf.image.decode_image(tf.io.read_file(img_file))
    #样本图片规格不一致,需要做通道转换,否则抛异常
    if sample_image.shape[2] != 1:
        img = Image.open(img_file)
        img = img.convert("L")#转为灰度图
        sample_image = np.array(img)
    sample_image = tf.image.resize(sample_image,[128, 128])
    return sample_image

#加载图片并转换为训练集和验证集,同时输出训练集、验证集样本数量
def load(img_path):
    trainImageList = []
    valImageList = []
    path = img_path + "/images"
    files = os.listdir(path)
    cnt = 0
    for imgFile in files:
        if os.path.isdir(imgFile):
            continue
        file = path + "/" + imgFile
        print("load image ", file)
        cnt += 1
        img = load_tensor_from_file(file)
        img = tf.squeeze(img)
        #每8张图片中抽取一个样本作为验证集
        if cnt % 8 == 0:
            valImageList.append(img)
        else:
            trainImageList.append(img)
        #加载1000张样本,机器配置有限,样本过多,报00M错误
        if cnt > 1000:
            break

    trainAnnList = []
    valAnnList = []
    path = img_path + "/ann"
    files = os.listdir(path)
    cnt = 0
    for imgFile in files:
        if os.path.isdir(imgFile):
            continue
        file = path + "/" + imgFile
        print("load image ", file)
        img = load_ann_from_file(file)
        cnt+=1
        if cnt % 8 == 0:
            valAnnList.append(img)
        else:
            trainAnnList.append(img)
        #加载1000张样本,机器配置有限,样本过多,报00M错误
        if cnt > 1000:
            break
    train_num = len(trainImageList)
    val_num = len(valImageList)
    x = tf.convert_to_tensor(trainImageList, dtype=tf.float32)
    y = tf.convert_to_tensor(trainAnnList, dtype=tf.float32)
    dataset_train = tf.data.Dataset.from_tensor_slices((x, y))
    x_val = tf.convert_to_tensor(valImageList, dtype=tf.float32)
    y_val = tf.convert_to_tensor(valAnnList, dtype=tf.float32)
    dataset_val = tf.data.Dataset.from_tensor_slices((x_val, y_val))
    return dataset_train, train_num, dataset_val, val_num

train_dataset, train_num, val_dataset, val_num = load("./pet_images")

def normalize(input_image, input_mask):
    input_image = tf.cast(input_image, tf.float32) / 128.0 - 1
    #mask图像数据需根据标记数据做具体转换,使用labelme工具标记的图像,需要将图像颜色转换为类别标签索引,否则loss=nan
    input_mask -= 1
    return input_image, input_mask

@tf.function
def load_image_train(x, y):
    input_image = tf.image.resize(x, (128, 128))
    input_mask = tf.image.resize(y, (128,128))
    if tf.random.uniform(()) > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)
    input_image, input_mask = normalize(input_image, input_mask)
    return input_image, input_mask

def load_image_test(x, y):
    input_image = tf.image.resize(x, (128, 128))
    input_mask = tf.image.resize(y, (128,128))
    input_image, input_mask = normalize(input_image, input_mask)
    return input_image, input_mask

TRAIN_LENGTH = train_num
#根据GPU性能调节BATCH_SIZE大小
BATCH_SIZE = 16#64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH= TRAIN_LENGTH

train = train_dataset.map(load_image_train, num_parallel_calls=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.map(load_image_test)

train_dataset = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.batch(BATCH_SIZE)

def display(display_list):
    plt.figure(figsize=(15,15))
    title = ['Input Image', 'True Mask', 'Predicted Mask']
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

np.set_printoptions(threshold=128*128)
for image, mask in train.take(1):
    sample_image, sample_mask = image, mask
    print(tf.reduce_min(mask), tf.reduce_max(mask), tf.reduce_mean(mask))
    display([sample_image,sample_mask])

OUTPUT_CHANNELS = 3

base_model = tf.keras.applications.MobileNetV2(input_shape=[128,128,3], include_top=False)
layer_names = [
    'block_1_expand_relu', #64x64
    'block_3_expand_relu', #32x32
    'block_6_expand_relu', #16x16
    'block_13_expand_relu',#8x8
    'block_16_project',    #4x4
]

layers = [base_model.get_layer(name).output for name in layer_names]
#创建特征提取模型
down_stack = tf.keras.Model(inputs=base_model.input, outputs=layers)
down_stack.trainable = False

up_stack =[
    pix2pix.upsample(512, 3),#4x4 -> 8x8
    pix2pix.upsample(256, 3),#8x8 -> 16x16
    pix2pix.upsample(128, 3),#16x16 -> 32x32
    pix2pix.upsample(64, 3), #32x32 -> 64x64
]

def unet_model(output_channels):
    last = tf.keras.layers.Conv2DTranspose(output_channels, 3, strides=2,padding='same', activation='softmax')
    inputs = tf.keras.layers.Input(shape=[128,128,3])
    x = inputs

    #降频采样
    skips = down_stack(x)
    x = skips[-1]#取最后一次输出
    skips=reversed(skips[:-1])

    #升频采样
    for up, skip in zip(up_stack, skips):
        x = up(x)
        concat = tf.keras.layers.Concatenate()
        x = concat([x, skip])

    x = last(x)

    return tf.keras.Model(inputs=inputs, outputs=x)

model = unet_model(OUTPUT_CHANNELS)
adam = tf.keras.optimizers.Adam(lr=1e-3)
#optimizer = optimizers.Adam(lr=1e-3)
model.compile(adam, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

def create_mask(pred_mask):
    pred_mask = tf.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    return pred_mask[0]

def show_predictions(dataset=None, num=1):
    if dataset:
        for image, mask in dataset.take(num):
            pred_mask = model.predict(image)
            display([image[0], mask[0], create_mask(pred_mask)])
    else:
        display([sample_image, sample_mask, 
                 create_mask(model.predict(sample_image[tf.newaxis, ...]))])

show_predictions()

class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    show_predictions()
    print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

EPOCHS = 10
VAL_SUBSPLITS = 5
VALIDATION_STEPS = val_num//BATCH_SIZE//VAL_SUBSPLITS

model_history = model.fit(train_dataset, epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=val_dataset,
                          callbacks=[DisplayCallback()])

model.save("poker.h5")
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']
epochs = range(EPOCHS)
plt.figure()
plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()

具体过程代码中交代的比较详细了,不在累述。由于pc资源限制,样本数量、epochs都没有调整到最优。效果图如下:

 

使用sklearn.train_test_split方法改进load方法:

#加载图片并转换为训练集和验证集,同时输出训练集、验证集样本数量
def load(img_path):
    trainImageList = []
    path = img_path + "/images"
    files = os.listdir(path)
    cnt = 0
    for imgFile in files:
        if os.path.isdir(imgFile):
            continue
        file = path + "/" + imgFile
        print("load image ", file)
        cnt += 1
        img = load_tensor_from_file(file)
        img = tf.squeeze(img)
        trainImageList.append(img)
        #加载1000张样本,机器配置有限,样本过多,报00M错误
        if cnt > 1000:
            break

    trainAnnList = []
    path = img_path + "/ann"
    files = os.listdir(path)
    cnt = 0
    for imgFile in files:
        if os.path.isdir(imgFile):
            continue
        file = path + "/" + imgFile
        print("load image ", file)
        img = load_ann_from_file(file)
        cnt+=1
        trainAnnList.append(img)
        #加载1000张样本,机器配置有限,样本过多,报00M错误
        if cnt > 1000:
            break
    train_x, val_x, train_y, val_y = train_test_split(trainImageList, trainAnnList, test_size=0.2, random_state=0)
    train_num = len(train_x)
    val_num = len(val_x)
    x = tf.convert_to_tensor(train_x, dtype=tf.float32)
    y = tf.convert_to_tensor(train_y, dtype=tf.float32)
    dataset_train = tf.data.Dataset.from_tensor_slices((x, y))
    x_val = tf.convert_to_tensor(val_x, dtype=tf.float32)
    y_val = tf.convert_to_tensor(val_y, dtype=tf.float32)
    dataset_val = tf.data.Dataset.from_tensor_slices((x_val, y_val))
    return dataset_train, train_num, dataset_val, val_num

 

GitHub 加速计划 / te / tensorflow
184.55 K
74.12 K
下载
一个面向所有人的开源机器学习框架
最近提交(Master分支:2 个月前 )
a49e66f2 PiperOrigin-RevId: 663726708 2 个月前
91dac11a This test overrides disabled_backends, dropping the default value in the process. PiperOrigin-RevId: 663711155 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐