【DL】第13章 使用 TensorFlow加载和预处理数据
🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎
📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃
🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝
📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】 深度学习【DL】
🖍foreword
✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。
如果你对这个系列感兴趣的话,可以关注订阅哟👋
所以到目前为止,我们只使用了适合内存的数据集,但深度学习系统通常在不适合 RAM 的非常大的数据集上进行训练。使用其他深度学习库实现大型数据集并对其进行有效预处理可能会很棘手,但 TensorFlow 借助Data API让这一切变得简单:您只需创建一个数据集对象,并告诉它从哪里获取数据以及如何转换它. TensorFlow 负责所有实现细节,例如多线程、队列、批处理和预取。此外,Data API 与 tf.keras 无缝协作!
现成的数据 API 可以读取文本文件(例如 CSV 文件)、具有固定大小记录的二进制文件以及使用 TensorFlow 的 TFRecord 格式的二进制文件,该格式支持不同大小的记录。TFRecord 是一种灵活高效的二进制格式,通常包含协议缓冲区(一种开源二进制格式)。Data API 还支持从 SQL 数据库中读取数据。此外,许多开源扩展都可用于从各种数据源中读取数据,例如 Google 的 BigQuery 服务。
有效地读取庞大的数据集并不是唯一的困难:数据还需要进行预处理,通常是标准化的。此外,它并不总是严格由方便的数字字段组成:可能有文本特征、分类特征等。这些需要进行编码,例如使用 one-hot 编码、词袋编码或 嵌入(正如我们将看到的,嵌入是表示类别或标记的可训练密集向量)。处理所有这些预处理的一种选择是编写您自己的自定义预处理层。另一种是使用 Keras 提供的标准预处理层。
在本章中,我们将介绍 Data API、TFRecord 格式,以及如何创建自定义预处理层和使用标准 Keras 层。我们还将快速浏览 TensorFlow 生态系统中的一些相关项目:
TF 变换 ( tf.Transform )
使可以编写一个预处理函数,在训练之前(以加快速度)在整个训练集上以批处理模式运行,然后导出到 TF 函数并合并到训练好的模型中,这样一旦部署在生产它可以处理动态预处理新实例。
TF 数据集 (TFDS)
提供一个方便的功能,可以下载许多常见的各种数据集,包括像 ImageNet 这样的大型数据集,以及使用 Data API 操作它们的方便的数据集对象。
所以让我们开始吧!
数据 API
这整个数据 API 围绕数据集的概念展开:您可能会怀疑,这表示数据项的序列。通常你会使用逐渐从磁盘读取数据的数据集,但为简单起见,让我们使用以下命令完全在 RAM 中创建数据集tf.data.Dataset.from_tensor_slices()
:
>>> X = tf.range(10) # any data tensor
>>> dataset = tf.data.Dataset.from_tensor_slices(X)
>>> dataset
<TensorSliceDataset shapes: (), types: tf.int32>
该from_tensor_slices()
函数接受一个张量并创建一个tf.data.Dataset
其元素是X
(沿第一维)的所有切片的 a,因此该数据集包含 10 个项目:张量 0、1、2、...、9。在这种情况下,我们将获得相同的数据集如果我们使用过tf.data.Dataset.range(10)
.
您可以像这样简单地迭代数据集的项目:
>>> for item in dataset:
... print(item)
...
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
[...]
tf.Tensor(9, shape=(), dtype=int32)
链接转换
一次您有一个数据集,您可以通过调用其转换方法对其应用各种转换。每个方法都返回一个新的数据集,所以你可以像这样链接转换(这个链如图 13-1 所示):
>>> dataset = dataset.repeat(3).batch(7)
>>> for item in dataset:
... print(item)
...
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)
图 13-1。链接数据集转换
在本例中,我们首先在原始数据集上调用该repeat()
方法,它返回一个新数据集,该数据集将重复原始数据集的项目 3 次。当然,这不会将内存中的数据全部复制三遍!(如果你在没有参数的情况下调用这个方法,新数据集将永远重复源数据集,所以迭代数据集的代码必须决定何时停止。)然后我们batch()
在这个新数据集上调用该方法,再一次创建一个新的数据集。这会将前一个数据集的项目分组为七个项目。最后,我们迭代这个最终数据集的项目。如您所见,该batch()
方法必须输出最后一批大小为 2 而不是 7 的批次,但您可以使用drop_remainder=True
如果您希望它删除最后一批,以便所有批次具有完全相同的大小。
警告
数据集方法不会修改数据集,它们会创建新数据集,因此请确保保留对这些新数据集的引用(例如,使用
dataset = ...
),否则不会发生任何事情。
您还可以通过调用该map()
方法来转换项目。例如,这会创建一个所有项目都翻倍的新数据集:
>>> dataset = dataset.map(lambda x: x * 2) # Items: [0,2,4,6,8,10,12]
此函数是您将调用以对数据应用任何预处理的函数。有时这将包括可能非常密集的计算,例如重塑或旋转图像,因此您通常希望生成多个线程以加快速度:就像设置num_parallel_calls
参数一样简单。请注意,您传递给map()
方法的函数必须可转换为 TF 函数(参见第 12 章)。
虽然该map()
方法将转换应用于每个项目,但该apply()
方法将转换应用于整个数据集。例如,以下代码将该unbatch()
函数应用于数据集1。新数据集中的每个项目都将是一个单整数张量,而不是一组七个整数:
>>> dataset = dataset.apply(tf.data.experimental.unbatch()) # Items: 0,2,4,...
也可以使用以下filter()
方法简单地过滤数据集:
>>> dataset = dataset.filter(lambda x: x < 10) # Items: 0 2 4 6 8 0 2 4 6...
您通常只想查看数据集中的几个项目。您可以使用以下take()
方法:
>>> for item in dataset.take(3):
... print(item)
...
tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)
洗牌数据(Shuffling the Data)
作为你知道,当训练集中的实例独立且同分布时,梯度下降效果最好(参见第 4 章)。确保这一点的一种简单方法是使用该shuffle()
方法对实例进行洗牌。它将创建一个新数据集,该数据集将首先用源数据集的第一项填充缓冲区。然后,每当它被要求提供一个项目时,它都会从缓冲区中随机抽取一个,并用源数据集中的一个新项目替换它,直到它完全遍历源数据集。此时它会继续从缓冲区中随机拉出项目,直到它为空。您必须指定缓冲区大小,并且使其足够大很重要,否则洗牌不会很有效。2只是不要超过你拥有的 RAM 数量,即使你有很多,也没有必要超出数据集的大小。如果每次运行程序时都想要相同的随机顺序,可以提供随机种子。例如,以下代码创建并显示一个包含整数 0 到 9 的数据集,重复 3 次,使用大小为 5 的缓冲区和随机种子 42 进行混洗,并使用批量大小为 7 进行批处理:
>>> dataset = tf.data.Dataset.range(10).repeat(3) # 0 to 9, three times
>>> dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
>>> for item in dataset:
... print(item)
...
tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)
小费
如果你调用
repeat()
一个打乱的数据集,默认情况下它会在每次迭代时生成一个新的顺序。这通常是一个好主意,但如果您希望在每次迭代中重用相同的顺序(例如,用于测试或调试),您可以设置reshuffle_each_iteration=False
.
为了对于一个不适合内存的大型数据集,这种简单的洗牌缓冲区方法可能还不够,因为与数据集相比,缓冲区会很小。一种解决方案是打乱源数据本身(例如,在 Linux 上,您可以使用shuf
命令)。这肯定会改善洗牌很多!即使源数据被打乱了,您通常也希望将其打乱一些,否则在每个时期都会重复相同的顺序,并且模型最终可能会出现偏差(例如,由于偶然出现的一些虚假模式源数据的顺序)。为了进一步打乱实例,一种常见的方法是将源数据拆分为多个文件,然后在训练期间以随机顺序读取它们。但是,位于同一文件中的实例仍然会彼此靠近。为避免这种情况,您可以随机选择多个文件并同时读取它们,交错它们的记录。然后最重要的是,您可以使用shuffle()
方法。如果所有这些听起来都需要大量工作,请不要担心:Data API 只需几行代码即可完成所有这些工作。让我们看看如何做到这一点。
交叉来自多个文件的行
首先,假设您已经加载了加利福尼亚住房数据集,对其进行了混洗(除非它已经混洗),并将其拆分为训练集、验证集和测试集。然后将每个集合拆分为多个 CSV 文件,每个文件如下所示(每行包含八个输入特征加上目标房屋价值中值):
MedInc,HouseAge,AveRooms,AveBedrms,Popul,AveOccup,Lat,Long,MedianHouseValue 3.5214,15.0,3.0499,1.1065,1447.0,1.6059,37.63,-122.43,1.442 5.3275,5.0,6.4900,0.9910,3464.0,3.4433,33.69,-117.39,1.687 3.1,29.0,7.5423,1.5915,1328.0,2.2508,38.44,-122.98,1.621 [...]
我们还假设train_filepaths
包含训练文件路径的列表(并且您也有valid_filepaths
和test_filepaths
):
>>> train_filepaths
['datasets/housing/my_train_00.csv', 'datasets/housing/my_train_01.csv',...]
或者,您可以使用文件模式;例如,train_filepaths = "datasets/housing/my_train_*.csv"
。现在让我们创建一个仅包含这些文件路径的数据集:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
默认情况下,该list_files()
函数返回一个对文件路径进行打乱的数据集。一般来说,这是一件好事,但shuffle=False
如果出于某种原因不想这样做,您可以进行设置。
接下来,您可以调用该interleave()
方法一次读取五个文件并交错它们的行(使用该skip()
方法跳过每个文件的第一行,即标题行):
n_readers = 5
dataset = filepath_dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers)
该interleave()
方法将创建一个数据集,该数据集将从 中提取五个文件路径filepath_dataset
,并且对于每个路径,它将调用您提供给它的函数(在本例中为 lambda)以创建新数据集(在本例中为 a TextLineDataset
)。需要明确的是,在这个阶段总共会有七个数据集:文件路径数据集、交错数据集和交错数据集TextLineDataset
内部创建的五个。当我们遍历 interleave 数据集时,它将在这五个TextLineDataset
s 中循环,从每个中一次读取一行,直到所有数据集都没有项目。然后它将从中获取接下来的五个文件路径filepath_dataset
并以相同的方式将它们交错,依此类推,直到用完文件路径。
小费
为使交错发挥最佳效果,最好有相同长度的文件;否则最长文件的末端不会被交错。
默认情况下,interleave()
不使用并行;它只是按顺序从每个文件中一次读取一行。如果您希望它实际并行读取文件,您可以将num_parallel_calls
参数设置为您想要的线程数(注意该map()
方法也有此参数)。您甚至可以将其设置tf.data.experimental.AUTOTUNE
为使 TensorFlow 根据可用 CPU 动态选择正确的线程数(不过,目前这是一个实验性功能)。让我们看一下数据集现在包含的内容:
>>> for line in dataset.take(5):
... print(line.numpy())
...
b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782'
b'4.1812,52.0,5.7013,0.9965,692.0,2.4027,33.73,-118.31,3.215'
b'3.6875,44.0,4.5244,0.9930,457.0,3.1958,34.04,-118.15,1.625'
b'3.3456,37.0,4.5140,0.9084,458.0,3.2253,36.67,-121.7,2.526'
b'3.5214,15.0,3.0499,1.1065,1447.0,1.6059,37.63,-122.43,1.442'
这些是随机选择的五个 CSV 文件的第一行(忽略标题行)。看起来不错!但正如你所见,这些只是字节串;我们需要解析它们并缩放数据。
预处理数据
X_mean, X_std = [...] # mean and scale of each feature in the training set
n_inputs = 8
def preprocess(line):
defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
fields = tf.io.decode_csv(line, record_defaults=defs)
x = tf.stack(fields[:-1])
y = tf.stack(fields[-1:])
return (x - X_mean) / X_std, y
让我们看一下这段代码:
-
首先,代码假设我们已经预先计算了训练集中每个特征的均值和标准差。
X_mean
并且X_std
只是包含八个浮点数的一维张量(或 NumPy 数组),每个输入特征一个。 -
该
preprocess()
函数采用一个 CSV 行并从解析它开始。为此,它使用该tf.io.decode_csv()
函数,该函数接受两个参数:第一个是要解析的行,第二个是包含 CSV 文件中每一列的默认值的数组。这个数组不仅告诉 TensorFlow 每一列的默认值,还告诉 TensorFlow 的列数和它们的类型。在这个例子中,我们告诉它所有特征列都是浮点数并且缺失值应该默认为 0,但是我们提供了一个空类型的数组tf.float32
作为最后一列(目标)的默认值:该数组告诉 TensorFlow 该列包含浮点数,但没有默认值,因此如果遇到缺失值,它将引发异常。 -
该
decode_csv()
函数返回一个标量张量列表(每列一个),但我们需要返回一维张量数组。所以我们调用tf.stack()
除了最后一个(目标)之外的所有张量:这会将这些张量堆叠成一维数组。然后我们对目标值做同样的事情(这使它成为一个具有单个值的一维张量数组,而不是一个标量张量)。 -
最后,我们通过减去特征均值然后除以特征标准差来缩放输入特征,我们返回一个包含缩放特征和目标的元组。
让我们测试一下这个预处理功能:
>>> preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')
(<tf.Tensor: id=6227, shape=(8,), dtype=float32, numpy=
array([ 0.16579159, 1.216324 , -0.05204564, -0.39215982, -0.5277444 ,
-0.2633488 , 0.8543046 , -1.3072058 ], dtype=float32)>,
<tf.Tensor: [...], numpy=array([2.782], dtype=float32)>)
看起来不错!我们现在可以将该函数应用于数据集。
把所有东西放在一起
至使代码可重用,让我们将到目前为止讨论的所有内容放在一个小型辅助函数中:它将创建并返回一个数据集,该数据集将从多个 CSV 文件中有效加载加州住房数据,对其进行预处理、随机播放、选择性地重复它,以及批处理它(见图 13-2):
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
n_read_threads=None, shuffle_buffer_size=10000,
n_parse_threads=5, batch_size=32):
dataset = tf.data.Dataset.list_files(filepaths)
dataset = dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers, num_parallel_calls=n_read_threads)
dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
return dataset.batch(batch_size).prefetch(1)
这段代码中的所有内容都应该是有意义的,除了最后一行 ( prefetch(1)
),这对性能很重要。
图 13-2。从多个 CSV 文件加载和预处理数据
预取
经过最后调用prefetch(1)
,我们正在创建一个数据集,该数据集将尽最大努力始终领先一批。3换句话说,当我们的训练算法在一个批次上工作时,数据集已经在并行工作以准备下一个批次(例如,从磁盘读取数据并对其进行预处理)。这可以显着提高性能,如图 13-3 所示。如果我们还确保加载和预处理是多线程的(通过num_parallel_calls
在调用interleave()
和时设置map()
),我们可以利用 CPU 上的多个内核,并希望使准备一批数据比在 GPU 上运行训练步骤更短:这样 GPU 将几乎 100% 被利用(除了从 CPU 到 GPU 的数据传输时间4),训练会运行得更快。
图 13-3。通过预取,CPU 和 GPU 并行工作:由于 GPU 在一个批次上工作,CPU 在下一个批次上工作
小费
如果你打算购买一张 GPU 卡,它的处理能力和内存大小当然非常重要(尤其是大量的 RAM 对计算机视觉至关重要)。获得良好性能同样重要的是它的内存带宽;这是每秒可以进出 RAM 的千兆字节数据数。
如果数据集足够小以适合内存,则可以通过使用数据集的cache()
方法将其内容缓存到 RAM 来显着加快训练速度。您通常应该在加载和预处理数据之后,但在改组、重复、批处理和预取之前执行此操作。这样,每个实例只会被读取和预处理一次(而不是每个 epoch 一次),但数据在每个 epoch 仍然会以不同的方式洗牌,并且仍然会提前准备下一批。
您现在知道如何构建高效的输入管道来加载和预处理来自多个文本文件的数据。我们已经讨论了最常见的数据集方法,但您可能还想了解更多:concatenate()
、zip()
、window()
、reduce()
、shard()
、flat_map()
和padded_batch()
。还有几个类方法:from_generator()
和from_tensors()
,它们分别从 Python 生成器或张量列表创建新数据集。请查看 API 文档以获取更多详细信息。另请注意,在 中提供了一些实验性功能tf.data.experimental
,其中许多可能会在未来的版本中成为核心 API(例如,检查CsvDataset
类和make_csv_dataset()
方法,该方法负责推断每列的类型)。
将数据集与 tf.keras 一起使用
现在我们可以使用该csv_reader_dataset()
函数为训练集创建数据集。请注意,我们不需要重复它,因为这将由 tf.keras 处理。我们还为验证集和测试集创建数据集:
train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)
现在我们可以使用这些数据集简单地构建和训练 Keras 模型。5我们需要做的就是将训练和验证数据集传递给fit()
方法,而不是X_train, y_train
、X_valid
和y_valid
:6
model = keras.models.Sequential([...])
model.compile([...])
model.fit(train_set, epochs=10, validation_data=valid_set)
同样,我们可以将数据集传递给evaluate()
andpredict()
方法:
model.evaluate(test_set)
new_set = test_set.take(3).map(lambda X, y: X) # pretend we have 3 new instances
model.predict(new_set) # a dataset containing new instances
与其他集合不同,new_set
通常不包含标签(如果包含,Keras 将忽略它们)。请注意,在所有这些情况下,如果需要,您仍然可以使用 NumPy 数组而不是数据集(当然,它们需要先被加载和预处理)。
如果你想构建自己的自定义训练循环(如第 12 章所述),你可以很自然地迭代训练集:
for X_batch, y_batch in train_set:
[...] # perform one Gradient Descent step
事实上,甚至可以创建一个执行整个训练循环的 TF 函数(参见第 12 章):
@tf.function
def train(model, optimizer, loss_fn, n_epochs, [...]):
train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, [...])
for X_batch, y_batch in train_set:
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
恭喜,您现在知道如何构建强大的使用数据 API 的输入管道!但是,到目前为止,我们使用的 CSV 文件很常见、简单、方便,但效率并不高,而且不能很好地支持大型或复杂的数据结构(例如图像或音频)。所以让我们看看如何改用 TFRecords。
TFRecord 格式
这TFRecord 格式是 TensorFlow 存储大量数据并高效读取数据的首选格式。它是一种非常简单的二进制格式,只包含一系列不同大小的二进制记录(每条记录由一个长度、一个用于检查长度是否未损坏的 CRC 校验和、然后是实际数据,最后是一个 CRC 校验和组成数据)。您可以使用该类轻松创建 TFRecord 文件tf.io.TFRecordWriter
:
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
f.write(b"This is the first record")
f.write(b"And this is the second record")
然后您可以使用 atf.data.TFRecordDataset
来读取一个或多个 TFRecord 文件:
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
print(item)
这将输出:
tf.Tensor(b'这是第一条记录', shape=(), dtype=string) tf.Tensor(b'这是第二条记录', shape=(), dtype=string)
小费
默认情况下,a
TFRecordDataset
会逐个读取文件,但您可以通过设置使其并行读取多个文件并交错记录num_parallel_reads
。或者,您可以通过使用list_files()
和获得相同的结果,interleave()
就像我们之前所做的那样读取多个 CSV 文件。
压缩的 TFRecord 文件
它有时对压缩 TFRecord 文件很有用,尤其是当它们需要通过网络连接加载时。您可以通过设置参数来创建压缩的 TFRecord 文件options
:
options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
[...]
读取压缩的 TFRecord 文件时,需要指定压缩类型:
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"],
compression_type="GZIP")
协议缓冲区简介
甚至尽管每条记录都可以使用您想要的任何二进制格式,但 TFRecord 文件通常包含序列化的协议缓冲区(也称为protobufs)。这是一种可移植的、可扩展的、高效的二进制格式,早在 2001 年由 Google 开发,并于 2008 年开源;protobufs 现在被广泛使用,特别是在gRPC中,谷歌的远程过程调用系统。它们是使用如下所示的简单语言定义的:
syntax = "proto3";
message Person {
string name = 1;
int32 id = 2;
repeated string email = 3;
}
这个定义说我们使用的是 protobuf 格式的版本 3,它指定每个Person
对象7可以(可选地)有一个name
type string
,一个id
type int32
,以及零个或多个email
字段,每个 type string
。数字1
、2
和3
是字段标识符:它们将用于每条记录的二进制表示。一旦你在.proto文件中有一个定义,你就可以编译它。这需要protoc
protobuf 编译器以 Python(或其他语言)生成访问类。请注意,我们将使用的 protobuf 定义已经为您编译好了,它们的 Python 类是 TensorFlow 的一部分,因此您不需要使用protoc
. 你只需要知道如何在 Python 中使用 protobuf 访问类。为了说明基础知识,让我们看一个使用为Person
protobuf 生成的访问类的简单示例(代码在注释中解释):
>>> from person_pb2 import Person # import the generated access class
>>> person = Person(name="Al", id=123, email=["a@b.com"]) # create a Person
>>> print(person) # display the Person
name: "Al"
id: 123
email: "a@b.com"
>>> person.name # read a field
"Al"
>>> person.name = "Alice" # modify a field
>>> person.email[0] # repeated fields can be accessed like arrays
"a@b.com"
>>> person.email.append("c@d.com") # add an email address
>>> s = person.SerializeToString() # serialize the object to a byte string
>>> s
b'\n\x05Alice\x10{\x1a\x07a@b.com\x1a\x07c@d.com'
>>> person2 = Person() # create a new Person
>>> person2.ParseFromString(s) # parse the byte string (27 bytes long)
27
>>> person == person2 # now they are equal
True
简而言之,我们导入由Person
生成的类protoc
,我们创建一个实例并使用它,将其可视化并读写一些字段,然后使用该SerializeToString()
方法对其进行序列化。这是准备好通过网络保存或传输的二进制数据。当读取或接收这个二进制数据时,我们可以使用该ParseFromString()
方法对其进行解析,得到一个被序列化的对象的副本。8
我们可以将序列化的Person
对象保存到 TFRecord 文件中,然后我们可以加载并解析它:一切都会正常工作。但是,SerializeToString()
并且ParseFromString()
不是 TensorFlow 操作(此代码中的其他操作也不是),因此它们不能包含在 TensorFlow 函数中(除非将它们包装在一个tf.py_function()
操作中,这会使代码更慢且更不便携,正如我们所见在第 12 章中)。幸运的是,TensorFlow 确实包含了特殊的 protobuf 定义,它为其提供了解析操作。
TensorFlow Protobufs
这TFRecord 文件中通常使用的主要 protobuf 是Example
protobuf,它表示数据集中的一个实例。它包含一个命名特征列表,其中每个特征可以是字节字符串列表、浮点列表或整数列表。这是 protobuf 的定义:
syntax = "proto3";
message BytesList { repeated bytes value = 1; }
message FloatList { repeated float value = 1 [packed = true]; }
message Int64List { repeated int64 value = 1 [packed = true]; }
message Feature {
oneof kind {
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};
message Features { map<string, Feature> feature = 1; };
message Example { Features features = 1; };
BytesList
、FloatList
和的定义Int64List
很简单。请注意,[packed = true]
它用于重复的数字字段,以实现更有效的编码。A Feature
包含 a BytesList
、 aFloatList
或 an Int64List
。A Features
(with an s
) 包含一个将特征名称映射到相应特征值的字典。最后,anExample
只包含一个Features
对象。9以下是如何创建一个tf.train.Example
代表与之前相同的人并将其写入 TFRecord 文件的方法:
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example
person_example = Example(
features=Features(
feature={
"name": Feature(bytes_list=BytesList(value=[b"Alice"])),
"id": Feature(int64_list=Int64List(value=[123])),
"emails": Feature(bytes_list=BytesList(value=[b"a@b.com",
b"c@d.com"]))
}))
代码有点冗长和重复,但它相当简单(你可以很容易地将它包装在一个小的帮助函数中)。现在我们有了一个Example
protobuf,我们可以通过调用它的SerializeToString()
方法来序列化它,然后将结果数据写入一个 TFRecord 文件:
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
f.write(person_example.SerializeToString())
通常你会写不止一个Example
!通常,您将创建一个从当前格式(例如 CSV 文件)读取的转换脚本,Example
为每个实例创建一个 protobuf,将它们序列化,并将它们保存到多个 TFRecord 文件中,理想情况下在此过程中将它们打乱。这需要一些工作,所以再次确保它确实是必要的(也许您的管道可以正常处理 CSV 文件)。
现在我们有了一个包含 serialized 的漂亮 TFRecord 文件Example
,让我们尝试加载它。
加载和解析示例
至加载序列化的Example
protobufs,我们将tf.data.TFRecordDataset
再次使用 a,我们将Example
使用tf.io.parse_single_example()
. 这是一个 TensorFlow 操作,因此它可以包含在 TF 函数中。它至少需要两个参数:一个包含序列化数据的字符串标量张量,以及每个特征的描述。描述是一个字典,它将每个特征名称映射到tf.io.FixedLenFeature
指示特征的形状、类型和默认值的描述符,或tf.io.VarLenFeature
仅指示类型的描述符(如果特征列表的长度可能不同,例如"emails"
特征)。
下面的代码定义了一个描述字典,然后它遍历TFRecordDataset
并解析了Example
这个数据集包含的序列化 protobuf:
feature_description = {
"name": tf.io.FixedLenFeature([], tf.string, default_value=""),
"id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
"emails": tf.io.VarLenFeature(tf.string),
}
for serialized_example in tf.data.TFRecordDataset(["my_contacts.tfrecord"]):
parsed_example = tf.io.parse_single_example(serialized_example,
feature_description)
固定长度的特征被解析为常规张量,而可变长度的特征被解析为稀疏张量。您可以使用 将稀疏张量转换为密集张量tf.sparse.to_dense()
,但在这种情况下,仅访问其值会更简单:
>>> tf.sparse.to_dense(parsed_example["emails"], default_value=b"")
<tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])>
>>> parsed_example["emails"].values
<tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])>
ABytesList
可以包含您想要的任何二进制数据,包括任何序列化对象。例如,您可以使用tf.io.encode_jpeg()
JPEG 格式对图像进行编码,并将此二进制数据放入BytesList
. 稍后,当您的代码读取 TFRecord 时,它会从解析 开始Example
,然后需要调用tf.io.decode_jpeg()
解析数据并获取原始图像(或者您可以使用tf.io.decode_image()
,它可以解码任何 BMP、GIF、JPEG 或 PNG 图像)。您还可以BytesList
通过使用序列化张量tf.io.serialize_tensor()
然后将生成的字节字符串放入BytesList
特征中来存储您想要的任何张量。稍后,当您解析 TFRecord 时,您可以使用tf.io.parse_tensor()
.
tf.io.parse_single_example()
您可能希望使用 逐批解析它们,而不是使用 逐个解析示例tf.io.parse_example()
:
dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).batch(10)
for serialized_examples in dataset:
parsed_examples = tf.io.parse_example(serialized_examples,
feature_description)
如您所见,Example
protobuf 可能足以满足大多数用例。但是,在处理列表列表时使用它可能有点麻烦。例如,假设您要对文本文档进行分类。每个文档可以表示为句子列表,其中每个句子表示为单词列表。也许每个文档也有一个评论列表,其中每个评论都表示为一个单词列表。可能还有一些上下文数据,例如文档的作者、标题和发布日期。TensorFlow 的SequenceExample
protobuf 就是为此类用例而设计的。
使用 SequenceExample Protobuf 处理列表的列表
这里是SequenceExample
protobuf的定义:
message FeatureList { repeated Feature feature = 1; };
message FeatureLists { map<string, FeatureList> feature_list = 1; };
message SequenceExample {
Features context = 1;
FeatureLists feature_lists = 2;
};
ASequenceExample
包含一个Features
用于上下文数据的FeatureLists
对象和一个包含一个或多个命名FeatureList
对象(例如,一个FeatureList
命名对象"content"
和另一个命名对象"comments"
)的对象。每个都FeatureList
包含一个Feature
对象列表,每个对象可能是一个字节字符串列表、一个 64 位整数列表或一个浮点数列表(在这个例子中,每个Feature
都代表一个句子或评论,可能形式为单词标识符列表)。构建SequenceExample
、序列化和解析它与构建、序列化和解析 an 类似Example
,但您必须使用它tf.io.parse_single_sequence_example()
来解析单个SequenceExample
或tf.io.parse_sequence_example()
解析一批。这两个函数都返回一个包含上下文特征(作为字典)和特征列表(也作为字典)的元组。如果特征列表包含不同大小的序列(如前面的示例中所示),您可能希望使用以下方法将它们转换为不规则张量tf.RaggedTensor.from_sparse()
(请参阅笔记本以获取完整代码):
parsed_context, parsed_feature_lists = tf.io.parse_single_sequence_example(
serialized_sequence_example, context_feature_descriptions,
sequence_feature_descriptions)
parsed_content = tf.RaggedTensor.from_sparse(parsed_feature_lists["content"])
预处理输入特征
准备中您的神经网络数据需要将所有特征转换为数字特征,通常对它们进行归一化等等。特别是,如果您的数据包含分类特征或文本特征,则需要将它们转换为数字。这可以在准备数据文件时使用您喜欢的任何工具(例如,NumPy、pandas 或 Scikit-Learn)提前完成。或者,您可以在使用数据 API 加载数据时动态预处理数据(例如,使用数据集的map()
方法,如前所述),或者您可以直接在模型中包含预处理层。现在让我们看看最后一个选项。
例如,以下是如何使用层实现标准化Lambda
层。对于每个特征,它减去平均值并除以其标准差(加上一个微小的平滑项以避免被零除):
means = np.mean(X_train, axis=0, keepdims=True)
stds = np.std(X_train, axis=0, keepdims=True)
eps = keras.backend.epsilon()
model = keras.models.Sequential([
keras.layers.Lambda(lambda inputs: (inputs - means) / (stds + eps)),
[...] # other layers
])
这不是太难!但是,您可能更喜欢使用一个很好的自包含自定义层(很像 Scikit-Learn's StandardScaler
),而不是使用全局变量,means
例如stds
:
class Standardization(keras.layers.Layer):
def adapt(self, data_sample):
self.means_ = np.mean(data_sample, axis=0, keepdims=True)
self.stds_ = np.std(data_sample, axis=0, keepdims=True)
def call(self, inputs):
return (inputs - self.means_) / (self.stds_ + keras.backend.epsilon())
adapt()
在您可以使用此标准化层之前,您需要通过调用该方法并将其传递给数据样本来使其适应您的数据集。这将允许它为每个特征使用适当的均值和标准差:
std_layer = Standardization()
std_layer.adapt(data_sample)
该样本必须足够大以代表您的数据集,但它不必是完整的训练集:通常,几百个随机选择的实例就足够了(但是,这取决于您的任务)。接下来,您可以像使用普通层一样使用此预处理层:
model = keras.Sequential()
model.add(std_layer)
[...] # create the rest of the model
model.compile([...])
model.fit([...])
如果您认为 Keras 应该包含一个像这样的标准化层,那么这里有一些好消息:当您阅读本文时,该keras.layers.Normalization
层可能已经可用。它将与我们的自定义图层非常相似Standardization
:首先,创建图层,然后通过将数据样本传递给方法使其适应您的数据集adapt()
,最后正常使用图层。
现在让我们看看分类特征。我们将从将它们编码为 one-hot向量开始。
使用 One-Hot 向量对分类特征进行编码
考虑我们在第 2 章ocean_proximity
探讨的加利福尼亚住房数据集中的特征:它是一个具有五个可能值的分类特征:、、、和。我们需要先对这个特征进行编码,然后再将其输入神经网络。由于类别很少,我们可以使用 one-hot 编码。为此,我们首先需要将每个类别映射到其索引(0 到 4),这可以使用查找表来完成:"<1H OCEAN"
"INLAND"
"NEAR OCEAN"
"NEAR BAY"
"ISLAND"
vocab = ["<1H OCEAN", "INLAND", "NEAR OCEAN", "NEAR BAY", "ISLAND"]
indices = tf.range(len(vocab), dtype=tf.int64)
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
num_oov_buckets = 2
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)
让我们看一下这段代码:
-
然后我们创建一个具有相应索引(0 到 4)的张量。
-
接下来,我们为查找表创建一个初始化程序,将类别列表及其对应的索引传递给它。在这个例子中,我们已经有了这些数据,所以我们使用
KeyValueTensorInitializer
; 但如果类别在文本文件中列出(每行一个类别),我们将使用 aTextFileInitializer
代替。 -
在最后两行我们创建查找表,为其提供初始化程序并指定词汇表外(oov)桶的数量。如果我们查找词汇表中不存在的类别,则查找表将计算该类别的哈希并使用它来将未知类别分配给 oov 桶之一。它们的索引在已知类别之后开始,因此在此示例中,两个 oov 存储桶的索引为 5 和 6。
为什么使用 oov 存储桶?好吧,如果类别的数量很大(例如,邮政编码、城市、单词、产品或用户)并且数据集也很大,或者它一直在变化,那么获取完整的类别列表可能并不方便。一种解决方案是基于数据样本(而不是整个训练集)定义词汇表,并为不在数据样本中的其他类别添加一些 oov 桶。您希望在训练期间找到的未知类别越多,您应该使用的 oov 存储桶就越多。确实,如果没有足够的oov桶,就会发生碰撞:不同的类别最终会在同一个桶中,因此神经网络将无法区分它们(至少不能基于这个特征)。
现在让我们使用查找表将一小批分类特征编码为 one-hot 向量:
>>> categories = tf.constant(["NEAR BAY", "DESERT", "INLAND", "INLAND"])
>>> cat_indices = table.lookup(categories)
>>> cat_indices
<tf.Tensor: id=514, shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>
>>> cat_one_hot = tf.one_hot(cat_indices, depth=len(vocab) + num_oov_buckets)
>>> cat_one_hot
<tf.Tensor: id=524, shape=(4, 7), dtype=float32, numpy=
array([[0., 0., 0., 1., 0., 0., 0.],
[0., 0., 0., 0., 0., 1., 0.],
[0., 1., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>
如您所见,"NEAR BAY"
被映射到索引 3,未知类别"DESERT"
被映射到两个 oov 存储桶之一(在索引 5 处),"INLAND"
并被映射到索引 1,两次。然后我们习惯于tf.one_hot()
对这些索引进行 one-hot 编码。请注意,我们必须告诉这个函数索引的总数,它等于词汇表大小加上 oov 桶的数量。现在您知道如何使用 TensorFlow 将分类特征编码为 one-hot 向量!
就像之前一样,将所有这些逻辑捆绑到一个不错的自包含类中并不难。它的adapt()
方法将获取数据样本并提取其中包含的所有不同类别。它将创建一个查找表以将每个类别映射到其索引(包括使用 oov 存储桶的未知类别)。然后它的call()
方法将使用查找表将输入类别映射到它们的索引。好吧,还有更多好消息:当您阅读本文时,Keras 可能会包含一个keras.layers.TextVectorization
名为它在词汇表中的索引。您可以在模型的开头添加此层,然后添加adapt()
call()
Lambda
tf.one_hot()
如果要将这些索引转换为单热向量,则将应用该函数的层。
不过,这可能不是最好的解决方案。每个 one-hot 向量的大小是词汇长度加上 oov 桶的数量。当只有几个可能的类别时这很好,但如果词汇量很大,编码它们的效率要高得多改为使用嵌入。
小费
根据经验,如果类别的数量低于 10,那么 one-hot 编码通常是可行的方法(但您的里程可能会有所不同!)。如果类别的数量大于 50(使用哈希桶时通常会出现这种情况),那么嵌入通常是更可取的。在 10 到 50 个类别中,您可能希望对这两个选项进行试验,看看哪一个最适合您的用例。
使用嵌入对分类特征进行编码
一个嵌入是表示一个类别的可训练密集向量。默认情况下,嵌入是随机初始化的,例如,"NEAR BAY"
类别最初可以由一个随机向量表示,例如[0.131, 0.890]
,而"NEAR OCEAN"
类别可能由另一个随机向量表示,例如[0.631, 0.791]
。在此示例中,我们使用 2D 嵌入,但维数是您可以调整的超参数。由于这些嵌入是可训练的,它们会在训练过程中逐渐提高;并且由于它们代表了相当相似的类别,梯度下降最终肯定会将它们推得更近,同时它会倾向于将它们从"INLAND"
类别的嵌入中移开(见图 13-4)。事实上,表示越好,神经网络就越容易做出准确的预测,因此训练倾向于使嵌入成为类别的有用表示。这个称为表示学习(我们将在第 17 章看到其他类型的表示学习)。
图 13-4。Embeddings 会在训练过程中逐渐改善
词嵌入
不是只有嵌入通常是手头任务的有用表示,但通常这些相同的嵌入可以成功地用于其他任务。最常见的例子是词嵌入(即单个词的嵌入):当你在处理自然语言处理任务时,你通常最好重用预训练的词嵌入,而不是自己训练。
使用向量来表示单词的想法可以追溯到 1960 年代,并且已经使用了许多复杂的技术来生成有用的向量,包括使用神经网络。但事情在 2013 年真正起飞,当时 Tomáš Mikolov 和其他谷歌研究人员发表了一篇论文10,描述了一种使用神经网络学习词嵌入的有效技术,显着优于以前的尝试。这使他们能够在非常大的文本语料库上学习嵌入:他们训练了一个神经网络来预测任何给定单词附近的单词,并获得了惊人的单词嵌入。例如,同义词具有非常接近的嵌入,并且语义相关的单词(例如法国、西班牙和意大利)最终聚集在一起。
然而,这不仅仅是接近性:词嵌入也沿着嵌入空间中有意义的轴组织。这是一个著名的例子:如果你计算 King – Man + Woman(加减这些词的嵌入向量),那么结果将非常接近单词 Queen 的嵌入(见图 13-5)。换句话说,词嵌入编码了性别的概念!同样,你可以计算马德里 - 西班牙 + 法国,结果接近巴黎,这似乎表明首都的概念也被编码在嵌入中。
图 13-5。相似词的词嵌入往往很接近,一些轴似乎编码了有意义的概念
不幸的是,词嵌入有时会捕捉到我们最糟糕的偏见。例如,尽管他们正确地知道男人对国王就像女人对王后一样,但他们似乎也知道男人对医生就像女人对护士一样:相当性别歧视!公平地说,这个特定的例子可能被夸大了,正如Malvina Nissim 等人在2019 年的一篇论文11中指出的那样。然而,确保深度学习算法的公平性是一个重要且活跃的研究课题。
让我们看看我们如何手动实现嵌入,以了解它们是如何工作的(然后我们将使用一个简单的 Keras 层代替)。首先,我们需要创建一个包含每个类别嵌入的嵌入矩阵,随机初始化;每个类别和每个 oov 存储桶将有一行,每个嵌入维度有一列:
embedding_dim = 2
embed_init = tf.random.uniform([len(vocab) + num_oov_buckets, embedding_dim])
embedding_matrix = tf.Variable(embed_init)
在此示例中,我们使用 2D 嵌入,但根据经验,嵌入通常具有 10 到 300 个维度,具体取决于任务和词汇量(您必须调整此超参数)。
这个嵌入矩阵是一个随机的 6 × 2 矩阵,存储在一个变量中(因此可以在训练期间通过梯度下降对其进行调整):
>>> embedding_matrix
<tf.Variable 'Variable:0' shape=(6, 2) dtype=float32, numpy=
array([[0.6645621 , 0.44100678],
[0.3528825 , 0.46448255],
[0.03366041, 0.68467236],
[0.74011743, 0.8724445 ],
[0.22632635, 0.22319686],
[0.3103881 , 0.7223358 ]], dtype=float32)>
现在让我们对与之前相同的一批分类特征进行编码,但这次使用这些嵌入:
>>> categories = tf.constant(["NEAR BAY", "DESERT", "INLAND", "INLAND"])
>>> cat_indices = table.lookup(categories)
>>> cat_indices
<tf.Tensor: id=741, shape=(4,), dtype=int64, numpy=array([3, 5, 1, 1])>
>>> tf.nn.embedding_lookup(embedding_matrix, cat_indices)
<tf.Tensor: id=864, shape=(4, 2), dtype=float32, numpy=
array([[0.74011743, 0.8724445 ],
[0.3103881 , 0.7223358 ],
[0.3528825 , 0.46448255],
[0.3528825 , 0.46448255]], dtype=float32)>
该tf.nn.embedding_lookup()
函数在给定索引处查找嵌入矩阵中的行——这就是它所做的一切。例如,查找表表明"INLAND"
类别位于索引 1,因此该tf.nn.embedding_lookup()
函数返回嵌入矩阵中第 1 行的嵌入(两次)[0.3528825, 0.46448255]
:
Keras 提供了一个keras.layers.Embedding
层来处理嵌入矩阵(默认情况下可训练);创建层时,它会随机初始化嵌入矩阵,然后当使用某些类别索引调用它时,它会返回嵌入矩阵中这些索引处的行:
>>> embedding = keras.layers.Embedding(input_dim=len(vocab) + num_oov_buckets,
... output_dim=embedding_dim)
...
>>> embedding(cat_indices)
<tf.Tensor: id=814, shape=(4, 2), dtype=float32, numpy=
array([[ 0.02401174, 0.03724445],
[-0.01896119, 0.02223358],
[-0.01471175, -0.00355174],
[-0.01471175, -0.00355174]], dtype=float32)>
将所有内容放在一起,我们现在可以创建一个 Keras 模型,该模型可以处理分类特征(以及常规数值特征)并学习每个类别(以及每个 oov 桶)的嵌入:
regular_inputs = keras.layers.Input(shape=[8])
categories = keras.layers.Input(shape=[], dtype=tf.string)
cat_indices = keras.layers.Lambda(lambda cats: table.lookup(cats))(categories)
cat_embed = keras.layers.Embedding(input_dim=6, output_dim=2)(cat_indices)
encoded_inputs = keras.layers.concatenate([regular_inputs, cat_embed])
outputs = keras.layers.Dense(1)(encoded_inputs)
model = keras.models.Model(inputs=[regular_inputs, categories],
outputs=[outputs])
该模型有两个输入:一个常规输入,每个实例包含八个数字特征,加上一个分类输入(每个实例包含一个分类特征)。它使用一个Lambda
层来查找每个类别的索引,然后查找这些索引的嵌入。接下来,它将嵌入和常规输入连接起来,以提供编码输入,这些输入准备好馈送到神经网络。此时我们可以添加任何类型的神经网络,但我们只需添加一个密集输出层,然后创建 Keras 模型。
当keras.layers.TextVectorization
图层可用时,您可以调用其adapt()
方法以使其从数据样本中提取词汇表(它将负责为您创建查找表)。然后你可以将它添加到你的模型中,它会执行索引查找(替换Lambda
前面代码示例中的层)。
笔记
One-hot 编码后跟
Dense
一层(没有激活函数也没有偏差)相当于Embedding
一层。然而,该Embedding
层使用更少的计算(当嵌入矩阵的大小增加时,性能差异变得明显)。该Dense
层的权重矩阵起到嵌入矩阵的作用。例如,使用大小为 20 的 one-hot 向量和Dense
具有 10 个单元的层等效于使用具有和的Embedding
层。结果,使用比该层后面的层中的单元数更多的嵌入维度将是浪费的。input_dim=20
output_dim=10
Embedding
现在让我们更仔细地看一下 Keras 预处理层。
Keras 预处理层
这TensorFlow 团队正致力于提供一组标准的Keras 预处理层。当您阅读本文时,它们可能已经可用;但是,届时 API 可能会略有变化,因此如果有任何异常行为,请参阅本章的笔记本。这个新 API 可能会取代现有的 Feature Columns API,后者更难使用且不太直观(如果你想了解更多关于 Feature Columns API 的信息,请查看本章的笔记本)。
我们已经讨论了其中的两个层:keras.layers.Normalization
将执行特征标准化的层(它将等同于Standardization
我们之前定义的层),以及TextVectorization
能够将输入中的每个单词编码到词汇表索引中的层。在这两种情况下,您都会创建层,adapt()
使用数据样本调用其方法,然后在模型中正常使用层。其他预处理层将遵循相同的模式。
该 API 还将包括一个keras.layers.Discretization
层,该层将连续数据切分到不同的 bin 中,并将每个 bin 编码为 one-hot 向量。例如,您可以使用它将价格离散化为三个类别(低、中、高),这些类别将被编码为 [1, 0, 0]、[0, 1, 0] 和 [0, 0, 1 ], 分别。当然,这会丢失很多信息,但在某些情况下,它可以帮助模型检测在仅查看连续值时并不明显的模式。
警告
该
Discretization
层将不可区分,并且只能在模型开始时使用。确实,模型的预处理层在训练过程中会被冻结,因此它们的参数不会受到梯度下降的影响,因此它们不需要是可微的。这也意味着你不应该Embedding
直接在自定义预处理层中使用一个层,如果你希望它是可训练的:相反,它应该单独添加到你的模型中,就像前面的代码示例一样。
也可以使用PreprocessingStage
该类链接多个预处理层。例如,以下代码将创建一个预处理管道,该管道首先对输入进行规范化,然后将它们离散化(这可能会让您想起 Scikit-Learn 管道)。在将此管道调整为数据样本后,您可以像在模型中使用常规层一样使用它(但同样,仅在模型开始时,因为它包含不可微的预处理层):
normalization = keras.layers.Normalization()
discretization = keras.layers.Discretization([...])
pipeline = keras.layers.PreprocessingStage([normalization, discretization])
pipeline.adapt(data_sample)
该TextVectorization
层还可以选择输出字数向量而不是字索引。例如,如果词汇表包含三个单词 say ["and", "basketball", "more"]
,那么文本"more and more"
将映射到向量[1, 0, 2]
:单词"and"
出现一次,单词"basketball"
根本不出现,单词"more"
出现两次。这个文本表示被称为词袋,因为它完全失去了词的顺序。常见的词 like"and"
在大多数文本中具有很大的价值,即使它们通常是最不有趣的(例如,在文本"more and more basketball"
中,这个词"basketball"
显然是最重要的,正是因为它不是一个非常频繁的词)。因此,单词计数应该以一种降低频繁单词重要性的方式进行归一化。这通常使用一种称为 词频×逆文档频率(TF-IDF)。有许多变体,但一个常见的变体是计算出现单词的训练实例的比率,并将单词计数乘以该比率的倒数的对数。例如,假设单词"and"
、"basketball"
和"more"
分别出现在训练集中所有文本实例的 90%、10% 和 50% 中:在这种情况下,最终向量将是[1*log(1/0.9), 0*log(1/0.1), 2*log(1/0.5)]
,大约等于[0.1, 0.0, 1.4]
。该TextVectorization
层将具有执行 TF-IDF 的选项。
笔记
如果标准预处理层不足以完成您的任务,您仍然可以选择创建自己的自定义预处理层,就像我们之前在
Standardization
课堂上所做的那样。使用方法创建该类的子keras.layers.PreprocessingLayer
类,该adapt()
方法应采用data_sample
参数和可选的额外reset_state
参数:如果True
,则该adapt()
方法应在计算新状态之前重置任何现有状态;如果False
,它应该尝试更新现有状态。
如您所见,这些 Keras 预处理层将使预处理变得更加容易!现在,无论您选择编写自己的预处理层还是使用 Keras(甚至使用 Feature Columns API),所有的预处理都将即时完成。然而,在训练期间,最好提前执行预处理。让我们看看我们为什么要这样做以及我们将如何去做。
TF变换(Transform)
如果预处理在计算上是昂贵的,然后在训练之前而不是在运行中处理它可能会给您带来显着的加速:在训练之前每个实例只对数据进行一次预处理,而不是在训练期间每个实例和每个 epoch预处理一次。如前所述,如果数据集小到足以放入 RAM,则可以使用它的cache()
方法。但如果它太大,那么像 Apache Beam 或 Spark 这样的工具会有所帮助。它们让您可以在大量数据上运行高效的数据处理管道,甚至分布在多个服务器上,因此您可以使用它们在训练之前对所有训练数据进行预处理。
这很有效,并且确实可以加快训练速度,但有一个问题:一旦你的模型被训练,假设你想将它部署到移动应用程序中。在这种情况下,您需要在应用程序中编写一些代码,以便在将数据输入模型之前对数据进行预处理。假设您还想将模型部署到 TensorFlow.js 以便它在 Web 浏览器中运行?再一次,您将需要编写一些预处理代码。这可能成为维护的噩梦:每当您想更改预处理逻辑时,都需要更新 Apache Beam 代码、移动应用程序代码和 JavaScript 代码。这不仅耗时,而且容易出错:您最终可能会在训练之前执行的预处理操作与在您的应用程序或浏览器中执行的预处理操作之间存在细微差别。这个 培训/服务偏差将导致错误或性能下降。
一项改进是采用经过训练的模型(根据您的 Apache Beam 或 Spark 代码预处理的数据进行训练),然后在将其部署到您的应用程序或浏览器之前,添加额外的预处理层以进行动态预处理。这肯定更好,因为现在您只有两个版本的预处理代码:Apache Beam 或 Spark 代码,以及预处理层的代码。
但如果您可以只定义一次预处理操作会怎样?这就是 TF Transform 的设计目的。它是TensorFlow Extended (TFX)的一部分,这是一个用于生产 TensorFlow 模型的端到端平台。首先,要使用TF Transform等TFX组件,必须安装它;它没有与 TensorFlow 捆绑在一起。然后,您只需定义一次预处理函数(在 Python 中),使用 TF Transform 函数进行缩放、分桶等。您还可以使用所需的任何 TensorFlow 操作。如果我们只有两个功能,这个预处理函数可能看起来像这样:
import tensorflow_transform as tft
def preprocess(inputs): # inputs = a batch of input features
median_age = inputs["housing_median_age"]
ocean_proximity = inputs["ocean_proximity"]
standardized_age = tft.scale_to_z_score(median_age)
ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
return {
"standardized_median_age": standardized_age,
"ocean_proximity_id": ocean_proximity_id
}
接下来,TF Transform 允许您使用 Apache Beam 将此preprocess()
函数应用于整个训练集(它提供了一个AnalyzeAndTransformDataset
类,您可以在 Apache Beam 管道中为此目的使用该类)。在此过程中,它还将计算整个训练集的所有必要统计信息:在此示例中,是特征的均值和标准差,以及housing_median_age
特征的词汇表ocean_proximity
。计算这些统计数据的组件称为分析器。
重要的是,TF Transform 还将生成一个等效的 TensorFlow 函数,您可以将其插入到您部署的模型中。此 TF 函数包含一些常数,这些常数对应于 Apache Beam 计算的所有必要统计数据(平均值、标准差和词汇表)。
借助 Data API、TFRecords、Keras 预处理层和 TF Transform,您可以构建高度可扩展的输入管道以进行训练,并从生产中的快速便携数据预处理中受益。
但是,如果您只想使用标准数据集怎么办?那么在这种情况下,事情就简单多了:只需使用 TFDS!
TensorFlow 数据集 (TFDS) 项目
这 TensorFlow Datasets项目使下载常见数据集变得非常容易,从 MNIST 或 Fashion MNIST 等小型数据集到 ImageNet 等大型数据集(您将需要相当多的磁盘空间!)。该列表包括图像数据集、文本数据集(包括翻译数据集)以及音频和视频数据集。您可以访问https://homl.info/tfds查看完整列表以及每个数据集的描述。
TFDS 未与 TensorFlow 捆绑,因此您需要安装该tensorflow-datasets
库(例如,使用 pip)。然后调用该tfds.load()
函数,它将下载您想要的数据(除非之前已经下载过)并将数据作为数据集字典返回(通常一个用于训练,一个用于测试,但这取决于您选择的数据集)。例如,让我们下载 MNIST:
import tensorflow_datasets as tfds
dataset = tfds.load(name="mnist")
mnist_train, mnist_test = dataset["train"], dataset["test"]
然后,您可以应用所需的任何转换(通常是混洗、批处理和预取),并准备好训练您的模型。这是一个简单的例子:
mnist_train = mnist_train.shuffle(10000).batch(32).prefetch(1)
for item in mnist_train:
images = item["image"]
labels = item["label"]
[...]
小费
该
load()
功能可以打乱它下载的文件:只需设置shuffle_files=True
. 但是,这可能还不够,所以最好多打乱训练数据。
请注意,数据集中的每个项目都是包含特征和标签的字典。但是 Keras 期望每个项目是一个包含两个元素(同样是特征和标签)的元组。您可以使用该map()
方法转换数据集,如下所示:
mnist_train = mnist_train.shuffle(10000).batch(32)
mnist_train = mnist_train.map(lambda items: (items["image"], items["label"]))
mnist_train = mnist_train.prefetch(1)
但是load()
通过设置要求函数为您执行此操作更简单as_supervised=True
(显然这仅适用于标记的数据集)。如果需要,您还可以指定批量大小。然后您可以将数据集直接传递给您的 tf.keras 模型:
dataset = tfds.load(name="mnist", batch_size=32, as_supervised=True)
mnist_train = dataset["train"].prefetch(1)
model = keras.models.Sequential([...])
model.compile(loss="sparse_categorical_crossentropy", optimizer="sgd")
model.fit(mnist_train, epochs=5)
这是一个技术性很强的章节,你可能会觉得它离神经网络的抽象美有点远,但事实是深度学习经常涉及大量数据,并且知道如何有效地加载、解析和预处理它是一项至关重要的技能。在下一章中,我们将研究卷积神经网络,它是用于图像处理和许多其他应用的最成功的神经网络架构之一。
练习
-
为什么要使用数据 API?
-
将大型数据集拆分为多个文件有什么好处?
-
在训练期间,您如何判断您的输入管道是瓶颈?你能做些什么来解决它?
-
您可以将任何二进制数据保存到 TFRecord 文件中,还是只保存序列化的协议缓冲区?
-
您为什么要费力地将所有数据转换为
Example
protobuf 格式?为什么不使用你自己的 protobuf 定义呢? -
使用 TFRecords 时,您希望何时激活压缩?为什么不系统地做呢?
-
数据可以在写入数据文件时直接进行预处理,或者在 tf.data 管道中,或者在模型中的预处理层中,或者使用 TF Transform。你能列出每个选项的一些优点和缺点吗?
-
列举一些可用于对分类特征进行编码的常用技术。文字呢?
-
加载 Fashion MNIST 数据集(第 10 章介绍);将其拆分为训练集、验证集和测试集;打乱训练集;并将每个数据集保存到多个 TFRecord 文件中。每条记录都应该是一个序列化的
Example
protobuf,具有两个特征:序列化图像(用于tf.io.serialize_tensor()
序列化每个图像)和标签。12然后使用 tf.data 为每个集合创建一个高效的数据集。最后,使用 Keras 模型来训练这些数据集,包括一个预处理层来标准化每个输入特征。尝试使输入管道尽可能高效,使用 TensorBoard 可视化分析数据。 -
在本练习中,您将下载一个数据集,对其进行拆分,创建一个
tf.data.Dataset
以加载它并有效地对其进行预处理,然后构建和训练一个包含Embedding
层的二元分类模型:-
下载大型电影评论数据集,其中包含来自Internet 电影数据库的 50,000 条电影评论。数据组织在两个目录中,train和test,每个目录都包含一个pos子目录,其中包含 12,500 条正面评论和一个neg子目录,其中包含 12,500 条负面评论。每个评论都存储在一个单独的文本文件中。还有其他文件和文件夹(包括预处理的词袋),但我们将在本练习中忽略它们。
-
将测试集拆分为验证集 (15,000) 和测试集 (10,000)。
-
使用 tf.data 为每个集合创建一个有效的数据集。
-
创建一个二元分类模型,使用一个
TextVectorization
层来预处理每个评论。如果该TextVectorization
层尚不可用(或者如果您喜欢挑战),请尝试创建自己的自定义预处理层:您可以使用tf.strings
包中的功能,例如lower()
将所有内容设为小写,regex_replace()
用空格替换标点符号,以及split()
拆分空格上的单词。您应该使用查找表来输出单词索引,该索引必须在adapt()
方法中准备好。 -
添加一个
Embedding
层并计算每个评论的平均嵌入,乘以单词数的平方根(参见第 16 章)。然后可以将重新缩放的平均嵌入传递给模型的其余部分。 -
训练模型并查看您获得的准确度。尝试优化您的管道以尽可能快地进行培训。
-
使用 TFDS 更轻松地加载相同的数据集:
tfds.load("imdb_reviews")
.
-
附录 A中提供了这些练习的解决方案。
1本书出版后,这种实验方法已被弃用。您现在可以更简单地使用dataset.unbatch()
.
2想象一下你左边的一副牌:假设你只拿最上面的三张牌并洗牌,然后随机挑选一张放在你的右边,另外两张在你手中。再取一张左手牌,将手中的三张牌洗混,随机抽取一张,放在右手边。当你完成了所有这样的牌,你会在你的右边有一副牌:你认为它会完美地洗牌吗?
3一般来说,只预取一批就可以了,但在某些情况下,您可能需要预取更多。或者,您可以让 TensorFlow 通过传递自动决定tf.data.experimental.AUTOTUNE
(目前这是一个实验性功能)。
4但是看看这个tf.data.experimental.prefetch_to_device()
函数,它可以直接将数据预取到 GPU。
5对数据集的支持特定于 tf.keras;这在 Keras API 的其他实现中不起作用。
6该fit()
方法将负责重复训练数据集。或者,您可以调用repeat()
训练数据集,使其永远重复,并steps_per_epoch
在调用fit()
方法时指定参数。这在一些极少数情况下可能很有用,例如,如果您想使用跨越 epoch 的 shuffle 缓冲区。
7由于 protobuf 对象旨在被序列化和传输,因此它们被称为消息。
8本章包含使用 TFRecords 所需了解的有关 protobufs 的最基本知识。要了解有关 protobuf 的更多信息,请访问https://homl.info/protobuf。
9为什么Example
还要定义,因为它只包含一个Features
对象?好吧,TensorFlow 的开发人员可能有一天会决定为其添加更多字段。只要新Example
定义仍然包含features
具有相同 ID 的字段,它将向后兼容。这种可扩展性是 protobuf 的一大特点。
10Tomas Mikolov 等人,“单词和短语的分布式表示及其组合性” ,第 26 届神经信息处理系统国际会议论文集2(2013 年):3111-3119。
11Malvina Nissim 等人,“公平胜于耸人听闻:男人对医生就像女人对医生一样”,arXiv 预印本 arXiv:1905.09866 (2019)。
12对于大图像,您可以tf.io.encode_jpeg()
改用。这将节省大量空间,但会损失一些图像质量。
更多推荐
所有评论(0)