##### Copyright 2020 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

分布式输入#

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

tf.distribute API 为用户提供了一种简单的方法,可将训练范围从一台计算机扩展到多台计算机。扩展模型时,用户还必须将其输入分布到多个设备上。tf.distribute 提供了相应的 API,您可以利用这些 API 在设备之间自动分布输入。

本指南将展示使用 tf.distribute API 创建分布式数据集和迭代器的不同方法。此外,还将涵盖以下主题:

  • 使用 tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.distribute_datasets_from_function 时的用法、分片和批处理选项。

  • 遍历分布式数据集的不同方式。

  • tf.distribute.Strategy.experimental_distribute_dataset/tf.distribute.Strategy.distribute_datasets_from_function API 与 tf.data API 之间的区别以及用户在使用中可能遇到的任何限制。

本指南不介绍如何将分布式输入与 Keras API 一起使用。

分布式数据集#

要使用 tf.distribute API 扩缩,请使用 tf.data.Dataset 表示其输入。tf.distribute 可以与 tf.data.Dataset 高效地协同工作(例如,通过自动预提取到每个加速器设备和定期性能更新)。如果您有使用除 tf.data.Dataset 以外的其他 API 的用例,请参阅本指南中的张量输入部分。在非分布式训练循环中,首先创建一个 tf.data.Dataset 实例,然后迭代各个元素。例如:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
# Simulate multiple CPUs with virtual devices
N_VIRTUAL_DEVICES = 2
physical_devices = tf.config.list_physical_devices("CPU")
tf.config.set_logical_device_configuration(
    physical_devices[0], [tf.config.LogicalDeviceConfiguration() for _ in range(N_VIRTUAL_DEVICES)])
print("Available devices:")
for i, device in enumerate(tf.config.list_logical_devices()):
  print("%d) %s" % (i, device))
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

为了在尽可能不更改用户现有代码的情况下使用户能够使用 tf.distribute 策略,我们引入了两个 API,它们将分配 tf.data.Dataset 实例并返回一个分布式数据集对象。随后,用户可以遍历此分布式数据集实例并像以前一样训练自己的模型。现在让我们更详细地看一下这两个 API - tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.distribute_datasets_from_function

tf.distribute.Strategy.experimental_distribute_dataset#

用法#

此 API 将 tf.data.Dataset 实例作为输入,并返回 tf.distribute.DistributedDataset 实例。您应当使用等于全局批次大小的值对输入数据集进行批处理。此全局批次大小是您要在所有设备中一步处理的样本数。您可以用 Python 样式迭代此分布式数据集,或者使用 iter 创建一个迭代器。返回的对象不是 tf.data.Dataset 实例,并且不支持以任何方式转换或检查数据集的任何其他 API。如果您没有特定的方式将输入分片到不同副本中,则建议使用此 API。

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))

属性#

批处理#

tf.distribute 使用新的批次大小(等于全局批次大小除以同步副本数)对输入 tf.data.Dataset 实例进行重新批处理。同步副本数等于训练期间参与梯度全归约的设备数。当用户在分布式迭代器上调用 next 时,将在每个副本上返回数据的每个副本批次大小。经过重新批处理的数据集基数将始终为副本数的倍数。下面是一些示例:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • 无分布:

      • 批次 1:[0, 1, 2, 3]

      • 批次 2:[4, 5]

    • 分布在 2 个副本上。最后一个批次 ([4, 5]) 被拆分到 2 个副本中。

    • 批次 1:

      • 副本 1:[0, 1]

      • 副本 2:[2, 3]

    • 批次 2:

      • 副本 1:[4]

      • 副本 2:[5]

  • tf.data.Dataset.range(4).batch(4)

    • 无分布:

      • 批次 1:[0, 1, 2, 3]

    • 分布在 5 个副本上:

      • 批次 1:

        • 副本 1:[0]

        • 副本 2:[1]

        • 副本 3:[2]

        • 副本 4:[3]

        • 副本 5:[]

  • tf.data.Dataset.range(8).batch(4)

    • 无分布:

      • 批次 1:[0, 1, 2, 3]

      • 批次 2:[4, 5, 6, 7]

    • 分布在 3 个副本上:

      • 批次 1:

        • 副本 1:[0, 1]

        • 副本 2:[2, 3]

        • 副本 3:[]

      • 批次 2:

        • 副本 1:[4, 5]

        • 副本 2:[6, 7]

        • 副本 3:[]

注:上面的示例仅说明了如何在不同副本上拆分全局批次。建议不要依赖于可能最终出现在每个副本上的实际值,因为它可能会根据实现而改变。

对数据集进行重新批处理的空间复杂度随副本数量线性增加。对于多工作器训练用例,这意味着输入流水线可能会遇到 OOM 错误。

分片#

tf.distribute 还使用 MultiWorkerMirroredStrategyTPUStrategy 在多工作进程训练中自动分片输入数据集。每个数据集都是在工作进程的 CPU 设备上创建的。在一组工作进程上自动分片数据集意味着每个工作进程都被分配了整个数据集的一个子集(如果设置了正确的 tf.data.experimental.AutoShardPolicy)。这是为了确保在每个步骤中,每个工作进程都将处理非重叠数据集元素的全局批次大小。自动分片有几个不同的选项,可以使用 tf.data.experimental.DistributeOptions 来指定。请注意,使用 ParameterServerStrategy 的多工作进程训练中没有自动分片,有关使用此策略创建数据集的更多信息,请参阅参数服务器策略教程

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

您可以为 tf.data.experimental.AutoShardPolicy 设置三个不同的选项:

  • AUTO:这是默认选项,意味着将尝试按 FILE 分片。如果未检测到基于文件的数据集,则按 FILE 分片的尝试失败。随后,tf.distribute 将退回到按 DATA 分片。请注意,如果输入数据集基于文件,但文件数小于工作进程数,则会引发错误。

  • FILE:如果您想将输入文件分片到所有工作进程上,则可以使用此选项。如果输入文件的数量远大于工作进程的数量并且文件中的数据均匀分布,则应使用此选项。如果文件中的数据分布不均匀,则此选项的缺点是有空闲的工作进程。如果文件数量小于工作进程数量,则会引发 InvalidArgumentError。如果发生这种情况,请将策略显式设置为 AutoShardPolicy.DATA。例如,我们将 2 个文件分布在 2 个工作进程上,每个工作进程有 1 个副本。文件 1 包含 [0, 1, 2, 3, 4, 5],文件 2 包含 [6, 7, 8, 9, 10, 11]。假设同步的副本总数为 2,全局批次大小为 4。

    • 工作进程 0:

      • 批次 1 = 副本 1:[0, 1]

      • 批次 2 = 副本 1:[2, 3]

      • 批次 3 = 副本 1:[4]

      • 批次 4 = 副本 1:[5]

    • 工作进程 1:

      • 批次 1 = 副本 2:[6, 7]

      • 批次 2 = 副本 2:[8, 9]

      • 批次 3 = 副本 2:[10]

      • 批次 4 = 副本 2:[11]

  • DATA:这将在所有工作进程中对元素自动分片。每个工作进程都会读取整个数据集,并且仅处理分配给它的分片。所有其他分片将被丢弃。如果输入文件数小于工作进程数,并且您希望跨所有工作进程对数据更好地分片,通常使用此方法。这种方法的缺点是,将在每个工作进程上读取整个数据集。例如,假设我们将 1 个文件分布到 2 个工作进程中。文件 1 包含 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]。假设同步副本总数为 2。

    • 工作进程 0:

      • 批次 1 = 副本 1:[0, 1]

      • 批次 2 = 副本 1:[4, 5]

      • 批次 3 = 副本 1:[8, 9]

    • 工作进程 1:

      • 批次 1 = 副本 2:[2, 3]

      • 批次 2 = 副本 2:[6, 7]

      • 批次 3 = 副本 2:[10, 11]

  • OFF:如果关闭自动分片,则每个工作进程都将处理所有数据。例如,假设我们将 1 个文件分布到 2 个工作进程中。文件 1 包含 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]。假设同步副本总数为 2。那么每个工作器的分布如下:

    • 工作进程 0:

      • 批次 1 = 副本 1:[0, 1]

      • 批次 2 = 副本 1:[2, 3]

      • 批次 3 = 副本 1:[4, 5]

      • 批次 4 = 副本 1:[6, 7]

      • 批次 5 = 副本 1:[8, 9]

      • 批次 6 = 副本 1:[10, 11]

    • 工作进程 1:

      • 批次 1 = 副本 2:[0, 1]

      • 批次 2 = 副本 2:[2, 3]

      • 批次 3 = 副本 2:[4, 5]

      • 批次 4 = 副本 2:[6, 7]

      • 批次 5 = 副本 2:[8, 9]

      • 批次 6 = 副本 2:[10, 11]

预提取#

默认情况下,tf.distribute 会向用户提供的 tf.data.Dataset 实例末尾添加预提取转换。预提取转换的参数 buffer_size 等于同步副本数。

tf.distribute.Strategy.distribute_datasets_from_function#

用法#

此 API 使用输入函数并返回 tf.distribute.DistributedDataset 实例。用户传入的输入函数具有 tf.distribute.InputContext 参数,并且应返回 tf.data.Dataset 实例。使用此 API,tf.distribute 不会对从输入函数返回的用户 tf.data.Dataset 实例进行任何进一步的更改。用户负责对数据集进行批处理和分片。tf.distribute 调用每个工作器的 CPU 设备上的输入函数。除了允许用户指定自己的批处理和分片逻辑外,当此 API 用于多工作器训练时,还表现出比 tf.distribute.Strategy.experimental_distribute_dataset 更出色的可扩展性和性能。

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16)
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)  # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)

属性#

批处理#

应当使用每个副本的批次大小对作为输入函数返回值的 tf.data.Dataset 实例进行批处理。每个副本的批次大小等于全局批次大小除以参与同步训练的副本数。这是因为 tf.distribute 会在每个工作进程的 CPU 设备上调用输入函数。在给定工作进程上创建的数据集应准备好供该工作进程上的所有副本使用。

分片#

tf.distribute.InputContext 对象由 tf.distribute 在后台创建,它作为参数隐式传递到用户的输入函数。它包含有关工作器数、当前工作器 ID 等方面的信息。此输入函数可以根据用户使用这些属性(属于 tf.distribute.InputContext 对象的一部分)设置的策略来处理分片。

预提取#

tf.distribute 不会在用户提供的输入函数所返回的 tf.data.Dataset 的末尾添加预提取转换,因此您需要在上例中显式调用 Dataset.prefetch

注:tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.distribute_datasets_from_function 都会返回不属于 tf.data.Dataset 类型的 tf.distribute.DistributedDataset 实例。您可以对这些实例进行迭代(如分布式迭代器部分中所示)并使用 element_spec 属性。

分布式迭代器#

与非分布式 tf.data.Dataset 实例类似,您将需要在 tf.distribute.DistributedDataset 实例上创建一个迭代器以对其进行迭代,并访问 tf.distribute.DistributedDataset 中的元素。下面是创建 tf.distribute.DistributedIterator 并将其用于训练模型的方法:

用法#

使用 Python 式 for 循环结构#

您可以使用用户友好的 Python 式循环对 tf.distribute.DistributedDataset 进行迭代。从 tf.distribute.DistributedIterator 返回的元素可以是单个 tf.Tensor 或包含每个副本的值的 tf.distribute.DistributedValues。将循环放置在 tf.function 内有助于提高性能。但是,目前不支持对放置在 tf.function 内的 tf.distribute.DistributedDataset 的循环使用 breakreturn

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)

使用 iter 创建显式迭代器#

要迭代 tf.distribute.DistributedDataset 实例中的元素,您可以在该实例上使用 iter API 创建一个 tf.distribute.DistributedIterator。使用显式迭代器,您可以迭代固定数量的步骤。为了从 tf.distribute.DistributedIterator 实例 dist_iterator 获取下一个元素,您可以调用 next(dist_iterator)dist_iterator.get_next()dist_iterator.get_next_as_optional()

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)

使用 next()tf.distribute.DistributedIterator.get_next 时,如果 tf.distribute.DistributedIterator 已到达末尾,将引发 OutOfRange 错误。客户端可以在 Python 端捕获该错误,并继续执行其他工作,例如设置检查点和评估。但是,如果您使用的是主机训练循环(即,每个 tf.function 运行多个步骤),这种方式将不会奏效,如下所示:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn 通过将步骤主体封装在 tf.range 中来包含多个步骤。在这种情况下,循环中没有依赖项的不同迭代可以并行开始,因此会在先前迭代的计算完成之前在后续的迭代中触发 OutOfRange 错误。一旦抛出 OutOfRange 错误,函数中的所有运算都会立即终止。如果您想要避免这种情况,则不抛出 OutOfRange 错误的替代方案为 tf.distribute.DistributedIterator.get_next_as_optionalget_next_as_optional 返回 tf.experimental.Optional,其中包含下一个元素或者不包含任何值(如果 tf.distribute.DistributedIterator 已到达末尾)。

# You can break the loop with `get_next_as_optional` by checking if the `Optional` contains a value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x: x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)

使用 element_spec 属性#

如果将分布式数据集的元素传递给 tf.function 并且需要 tf.TypeSpec 保证,则可以指定 tf.functioninput_signature 参数。分布式数据集的输出为 tf.distribute.DistributedValues,它可以表示单个设备或多个设备的输入。要获取与此分布式值相对应的 tf.TypeSpec,可以使用 tf.distribute.DistributedDataset.element_spectf.distribute.DistributedIterator.element_spec

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)

数据预处理#

目前为止,您已经学习了如何分布 tf.data.Dataset。但在数据准备好用于模型之前,还需要对其进行预处理,例如对数据进行清理、转换和扩充。以下是两套方便的预处理工具:

  • Keras 预处理层:一组可供开发者构建 Keras 原生输入处理流水线的 Keras 层。 一些 Keras 预处理层包含不可训练的状态,可以在初始化时设置或进行 adapt(请参阅 Keras 预处理层指南adapt 部分)。在分布有状态预处理层时,应将状态复制到所有工作进程。要使用这些层,您可以使其成为模型的一部分或将其应用于数据集。

  • TensorFlow Transform (tf.Transform):可供您通过数据预处理流水线定义实例级和全通数据转换的 TensorFlow 库。TensorFlow Transform 包含两个阶段。第一个阶段为分析阶段,该阶段会在全通进程中分析原始训练数据,以计算转换所需的统计数据,并会生成转换逻辑作为实例级运算。第二个阶段为转换阶段,该阶段会在实例级进程中转换原始训练数据。

Keras 预处理层与 TensorFlow Transform#

TensorFlow Transform 和 Keras 预处理层均支持在训练期间拆分预处理,并在推断期间将预处理与模型捆绑在一起,从而降低训练/应用偏差。

TensorFlow Transform 已与 TFX 深度集成,提供了一项可扩缩的映射-归约解决方案,可在与训练流水线分开的作业中分析和转换任何大小的数据集。如果您需要运行的数据集分析不适合在单台机器上进行,则 TensorFlow Transform 应是您的首选。

Keras 预处理层则更适于首先从磁盘读取数据,然后在训练期间应用的预处理。它们能够无缝适配 Keras 库中的模型开发。它们支持通过 adapt 来分析较小的数据集,并支持诸如图像数据扩充等用例,在图像数据扩充中,每次传递输入数据集都会产生不同的训练样本。

这两个库也可以混合使用,其中 TensorFlow Transform 用于输入数据分析和静态转换,Keras 预处理层用于训练时转换(例如,独热编码或数据扩充)。

tf.distribute 最佳做法#

使用这两种工具都需要初始化应用于数据的转换逻辑,这可能会创建 TensorFlow 资源。这些资源或状态应复制到所有工作进程,以节省工作进程间或工作进程-协调器间的通信。为此,建议您在 tf.distribute.Strategy.scope 下创建 Keras 预处理层 tft.TFTransformOutput.transform_features_layertft.TransformFeaturesLayer,就像创建任何其他 Keras 层一样。

以下示例分别演示了 tf.distribute.Strategy API 与高级 Keras Model.fit API 以及与自定义训练循环配合使用的用法。

针对 Keras 预处理层用户的额外说明:#

预处理层和大型词汇表

在多工作进程环境(例如,tf.distribute.MultiWorkerMirroredStrategytf.distribute.experimental.ParameterServerStrategytf.distribute.TPUStrategy)中处理大型词汇表(超过 1 GB)时,建议将词汇表保存至所有工作进程均可访问的静态文件中(例如,使用 Cloud Storage)。这将减少在训练期间向所有工作进程复制词汇表所花费的时间。

tf.data 流水线中的预处理与模型中的预处理

Keras 预处理层既可以作为模型的一部分应用,也可以直接应用于 tf.data.Dataset,但每种选项各具优势:

  • 在模型中应用预处理层可以使您的模型具备可移植性,并有助于减少训练/应用偏差。(有关详情,请参阅使用预处理层指南中的推断时在模型内部进行预处理的好处部分)

  • tf.data 流水线中应用预处理可以预提取或卸载至 CPU,这通常可以在使用加速器时提高性能。

在一个或多个 TPU 上运行时,用户几乎应始终将 Keras 预处理层置于 tf.data 流水线内,因为并非所有层都支持 TPU,并且无法在 TPU 上执行字符串运算。(tf.keras.layers.Normalizationtf.keras.layers.Rescaling 是两个例外,它们在 TPU 上运行良好,并且常被用作图像模型中的第一层。)

使用 Model.fit 进行预处理#

使用 Keras Model.fit 时,您不需要使用 tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.distribute_datasets_from_function 自行分布数据。请参阅使用预处理层指南和使用 Keras 进行分布式训练指南以了解详情。一个简短的示例如下所示:

strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
  # Create the layer(s) under scope.
  integer_preprocessing_layer = tf.keras.layers.IntegerLookup(vocabulary=FILE_PATH)
  model = ...
  model.compile(...)
dataset = dataset.map(lambda x, y: (integer_preprocessing_layer(x), y))
model.fit(dataset)

使用 tf.distribute.experimental.ParameterServerStrategyModel.fit API 的用户需要使用 tf.keras.utils.experimental.DatasetCreator 作为输入。(请参阅参数服务器训练指南以了解详情。)

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)

with strategy.scope():
  preprocessing_layer = tf.keras.layers.StringLookup(vocabulary=FILE_PATH)
  model = ...
  model.compile(...)

def dataset_fn(input_context):
  ...
  dataset = dataset.map(preprocessing_layer)
  ...
  return dataset

dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
model.fit(dataset_creator, epochs=5, steps_per_epoch=20, callbacks=callbacks)

使用自定义训练循环进行预处理#

编写自定义训练循环时,您将使用 tf.distribute.Strategy.experimental_distribute_dataset API 或 tf.distribute.Strategy.distribute_datasets_from_function API 分布数据。如果您通过 tf.distribute.Strategy.experimental_distribute_dataset 分布数据集,则在数据流水线中应用这些预处理 API 将导致资源自动与数据流水线归于同一位置,以避免远程资源访问。因此,这里的示例都将使用 tf.distribute.Strategy.distribute_datasets_from_function,在这种情况下,必须在 strategy.scope() 下放置这些 API 的初始化以提高效率:

strategy = tf.distribute.MirroredStrategy()
vocab = ["a", "b", "c", "d", "f"]

with strategy.scope():
  # Create the layer(s) under scope.
  layer = tf.keras.layers.StringLookup(vocabulary=vocab)

def dataset_fn(input_context):
  # a tf.data.Dataset
  dataset = tf.data.Dataset.from_tensor_slices(["a", "c", "e"]).repeat()

  # Custom your batching, sharding, prefetching, etc.
  global_batch_size = 4
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = dataset.batch(batch_size)
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)

  # Apply the preprocessing layer(s) to the tf.data.Dataset
  def preprocess_with_kpl(input):
    return layer(input)

  processed_ds = dataset.map(preprocess_with_kpl)
  return processed_ds

distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)

# Print out a few example batches.
distributed_dataset_iterator = iter(distributed_dataset)
for _ in range(3):
  print(next(distributed_dataset_iterator))

请注意,如果您使用 tf.distribute.experimental.ParameterServerStrategy 进行训练,那么您还将调用 tf.distribute.experimental.coordinator.ClusterCoordinator.create_per_worker_dataset

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)

对于 TensorFlow Transform,如上所述,分析阶段会与训练分开完成,因此在此省略。有关详细的操作方法,请参阅教程。通常,此阶段包括创建 tf.Transform 预处理函数,以及使用此预处理函数转换 Apache Beam 流水线中的数据。在分析阶段结束时,可以将输出导出为 TensorFlow 计算图,进而用于训练和应用。我们的示例仅涵盖了训练流水线部分:

with strategy.scope():
  # working_dir contains the tf.Transform output.
  tf_transform_output = tft.TFTransformOutput(working_dir)
  # Loading from working_dir to create a Keras layer for applying the tf.Transform output to data
  tft_layer = tf_transform_output.transform_features_layer()
  ...

def dataset_fn(input_context):
  ...
  dataset.map(tft_layer, num_parallel_calls=tf.data.AUTOTUNE)
  ...
  return dataset

distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)

部分批次#

当 1) 用户创建的 tf.data.Dataset 实例包含的批次大小不能被副本数整除,或者 2) 数据集实例的基数不能被批次大小整除时,将遇到部分批次。这意味着,当数据集分布在多个副本上时,某些迭代器上的 next 调用将导致 tf.errors.OutOfRangeError。要处理此用例,tf.distribute 会在没有更多数据要处理的副本上返回批次大小为 0 的虚拟批次。

对于单工作进程情况,如果迭代器上的 next 调用未返回数据,则会创建批次大小为 0 的虚拟批次,并将其与数据集中的实际数据一起使用。在部分批次的情况下,数据的最后一个全局批次将包含实际数据以及虚拟数据批次。现在,用于处理数据的停止条件会检查是否有任何副本具有数据。如果任何副本上都没有数据,则会出现 tf.errors.OutOfRangeError 错误。

对于多工作进程情况,使用跨副本通信聚合表示每个工作进程上数据存在的布尔值,该布尔值用于标识所有工作进程是否已完成对分布式数据集的处理。由于这涉及跨工作进程通信,因此会涉及一些性能损失。

警告#

  • tf.distribute.Strategy.experimental_distribute_dataset API 与多工作进程环境结合使用时,您会传递从文件读取的 tf.data.Dataset。如果 tf.data.experimental.AutoShardPolicy 设置为 AUTOFILE,则实际的每步批次大小可能会小于您为全局批次大小定义的值。当文件中的剩余元素小于全局批次大小时,可能会发生这种情况。您可以在不依赖于运行步数的情况下耗尽数据集,也可以通过将 tf.data.experimental.AutoShardPolicy 设置为 DATA 来解决。

  • tf.distribute 当前不支持有状态数据集转换,并且当前将忽略数据集可能具有的任何有状态运算。例如,如果您的数据集包含使用 tf.random.uniform 来旋转图像的 map_fn,您的数据集计算图将依赖于执行 Python 进程的本地机器上的状态(即,随机种子)。

  • 默认停用的实验性 tf.data.experimental.OptimizationOptions 在某些上下文中(例如与 tf.distribute 一起使用时)可能会导致性能下降。只有在分布设置中验证它们有利于您的工作负载性能后,才应将其启用。

  • 请参阅这篇指南,了解如何使用 tf.data 优化您的输入流水线。一些附加提示:

    • 如果您有多个工作进程并且正在使用 tf.data.Dataset.list_files 从匹配一个或多个 glob 模式的所有文件创建数据集,请记住设置 seed 参数或设置 shuffle=False,这样每个工作进程才能一致地分片文件。

  • 如果您的输入流水线包括在记录级别上打乱数据的顺序和解析数据,除非未解析的数据明显大于已解析的数据(通常不是这种情况),否则请先打乱数据,然后再解析,如下面的示例中所示。这样做对内存使用率和性能有利。

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) 维持 buffer_size 元素的内部缓冲区,因此减小 buffer_size 可以缓解 OOM 问题。

  • 使用 tf.distribute.experimental_distribute_datasettf.distribute.distribute_datasets_from_function 时,工作进程处理数据的顺序无法得到保证。如果您使用 tf.distribute 来扩展预测,这通常是必需的。但是,您可以为批次中的每个元素插入索引并相应地对输出进行排序。以下代码段是如何对输出进行排序的示例。

注:为方便起见,此处使用 tf.distribute.MirroredStrategy。仅当您使用多工作进程,但将 tf.distribute.MirroredStrategy 用于在单工作进程上分布训练时,才需要对输入重新排序。

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)

## 张量输入而非 tf.data

有时用户无法使用 tf.data.Dataset 表示其输入,随后也无法使用上述 API 将数据集分布到多个设备。在这种情况下,您可以使用原始张量或来自生成器的输入。

将 experimental_distribute_values_from_function 用于任意张量输入#

strategy.run 接受 tf.distribute.DistributedValues,它是 next(iterator) 的输出。要传递张量值,请使用 tf.distribute.Strategy.experimental_distribute_values_from_function 从原始张量构造 tf.distribute.DistributedValues。用户必须使用此选项在输入函数中指定自己的批处理和分片逻辑,这可以使用 tf.distribute.experimental.ValueContext 输入对象来完成。

mirrored_strategy = tf.distribute.MirroredStrategy()

def value_fn(ctx):
  return tf.constant(ctx.replica_id_in_sync_group)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x: x, args=(distributed_values,))
  print(result)

如果您的输入来自生成器,则使用 tf.data.Dataset.from_generator#

如果您具有要使用的生成器函数,则可以使用 from_generator API 创建一个 tf.data.Dataset 实例。

注:tf.distribute.TPUStrategy 当前不支持此功能。

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  result = mirrored_strategy.run(lambda x: x, args=(next(iterator),))
  print(result)