##### Copyright 2018 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
%cd ..
from set_env import temp_dir
/media/pc/data/lxw/ai/d2py/doc/libs/tf-chaos/guide

使用 TensorFlow 进行分布式训练#

在 TensorFlow.org 上查看 在 Google Colab 运行 在 Github 上查看源代码 下载笔记本

文本特征向量#

tf.distribute.Strategy 是一个可在多个 GPU、多台机器或 TPU 上进行分布式训练的 TensorFlow API。使用此 API,您只需改动较少代码就能分布现有模型和训练代码。

tf.distribute.Strategy 旨在实现以下目标:

  • 易于使用,支持多种用户(包括研究人员和机器学习工程师等)。

  • 提供开箱即用的良好性能。

  • 轻松切换策略。

您可以将 tf.distribute.Strategy 与 Keras Model.fit 之类的高级 API 以及自定义训练循环(通常使用 TensorFlow 来进行计算)结合使用来分布训练。

在 TensorFlow 2.x 中,您可以在 Eager 模式下执行程序,也可以使用 tf.function 在计算图中执行。虽然 tf.distribute.Strategy 对两种执行模式都支持,但使用 tf.function 效果最佳。建议仅将 Eager 模式用于调试,而 TPUStrategy 不支持此模式。尽管本指南大部分时间在讨论训练,但此 API 也可用于在不同平台上分布评估和预测。

您在使用 tf.distribute.Strategy 时只需改动少量代码,因为我们修改了 TensorFlow 的底层组件,使其可感知策略。这些组件包括变量、层、优化器、指标、摘要和检查点。

在本指南中,您将了解各种类型的策略以及如何在不同情况下使用它们。要了解如何调试性能问题,请参阅优化 TensorFlow GPU 性能指南。

注:要更深入地了解这些概念,请观看深入演示 Inside TensorFlow:tf.distribute.Strategy。如果您打算编写自己的训练循环,则特别推荐这样做。

设置 TensorFlow#

import tensorflow as tf

策略类型#

tf.distribute.Strategy 打算涵盖不同轴上的许多用例。目前已支持其中的部分组合,将来还会添加其他组合。其中一些轴包括:

  • 同步和异步训练:这是通过数据并行进行分布式训练的两种常用方法。在同步训练中,所有工作进程都同步地对输入数据的不同片段进行训练,并且会在每一步中聚合梯度。在异步训练中,所有工作进程都独立训练输入数据并异步更新变量。通常情况下,同步训练通过全归约实现,而异步训练通过参数服务器架构实现。

  • 硬件平台:您可能需要将训练扩展到一台机器上的多个 GPU 或一个网络中的多台机器(每台机器拥有 0 个或多个 GPU),或扩展到 Cloud TPU 上。

为了支持这些用例,TensorFlow 提供了 MirroredStrategyTPUStrategyMultiWorkerMirroredStrategyParameterServerStrategyCentralStorageStrategy,以及其他可用策略。下一部分将具体说明 TensorFlow 中的哪些场景支持上述策略。以下是快速概览:

训练 API

MirroredStrategy

TPUStrategy

MultiWorkerMirroredStrategy

CentralStorageStrategy

ParameterServerStrategy

Keras Model.fit

支持

支持

支持

实验性支持

实验性支持

自定义训练循环

支持

支持

支持

实验性的支持

实验性的支持

Estimator API

有限支持

不支持

有限支持

有限支持

有限支持

注:实验性支持是指兼容性保证不涵盖这些 API。

警告:Estimator 为有限支持。基本训练和评估为实验性支持,而基架等高级功能未得到实现。如果未涵盖用例,则应使用 Keras 或自定义训练循环。不建议将 Estimator 用于新代码。Estimator 运行 v1.Session 风格的代码,此类代码更加难以正确编写,并且可能会出现意外行为,尤其是与 TF 2 代码结合使用时。Estimator 确实在我们的兼容性保证范围内,但除了安全漏洞之外不会得到任何修复。请转到迁移指南了解详情。

MirroredStrategy#

tf.distribute.MirroredStrategy 支持在一台机器的多个 GPU 上进行同步分布式训练。该策略会为每个 GPU 设备创建一个副本。模型中的每个变量都会在所有副本之间进行镜像。这些变量将共同形成一个名为 MirroredVariable 的单个概念变量。这些变量会通过应用相同的更新彼此保持同步。

高效的全归约算法用于跨设备传达变量更新。全归约通过将所有设备中的张量相加来聚合它们,并使它们在每个设备上都可用。这是一种极其高效的融合算法,可以显著降低同步产生的开销。有许多全归约算法和实现,具体取决于设备之间可用的通信类型。默认情况下,它使用 NVIDIA Collective Communication Library (NCCL) 作为全归约实现。您可以从其他几个选项中进行选择或者编写自己的选项。

以下是创建 MirroredStrategy 的最简单方式:

mirrored_strategy = tf.distribute.MirroredStrategy()

这会创建一个 MirroredStrategy 实例,该实例使用所有对 TensorFlow 可见的 GPU,并使用 NCCL 进行跨设备通信。

如果您只想使用机器上的部分 GPU,您可以这样做:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

如果您想重写跨设备通信,可以通过提供 tf.distribute.CrossDeviceOps 的实例,使用 cross_device_ops 参数来实现。目前,除了默认选项 tf.distribute.NcclAllReduce 外,还有 tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDevice 两个选项。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

TPUStrategy#

您可以使用 tf.distribute.experimental.TPUStrategy 在张量处理单元 (TPU) 上运行 TensorFlow 训练。TPU 是 Google 的专用 ASIC,旨在显著加速机器学习工作负载。您可以通过 Google Colab、TensorFlow Research CloudCloud TPU 平台进行使用。

就分布式训练架构而言,TPUStrategyMirroredStrategy 相同,即实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算,并将其用于 TPUStrategy

下面演示了如何将 TPUStrategy 实例化:

注:要在 Colab 中运行任何 TPU 代码,应将 TPU 作为 Colab 运行时。请参阅使用 TPU 指南获得完整示例。

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolver 实例可帮助定位 TPU。在 Colab 中,您无需为其指定任何参数。

如果您想要将其用于 Cloud TPU,则必须执行以下操作:

  • tpu 参数中指定 TPU 资源的名称。

  • 在程序开始时显式地初始化 TPU 系统。这是使用 TPU 进行计算前的必需步骤。初始化 TPU 系统还会清除 TPU 内存,所以为了避免丢失状态,请务必先完成此步骤。

MultiWorkerMirroredStrategy#

tf.distribute.MultiWorkerMirroredStrategyMirroredStrategy 非常相似。它实现了跨多个工作进程的同步分布式训练,而每个工作进程可能有多个 GPU。与 MirroredStrategy 类似,它也会跨所有工作进程在每个设备的模型中创建所有变量的副本。

以下是创建 MultiWorkerMirroredStrategy 的最简单方式:

strategy = tf.distribute.MultiWorkerMirroredStrategy()

MultiWorkerMirroredStrategy 有两种用于跨设备通信的实现。CommunicationImplementation.RING 基于 RPC,同时支持 CPU 和 GPU。CommunicationImplementation.NCCL 使用 NCCL 并在 GPU 上提供最先进的性能,但它不支持 CPU。CollectiveCommunication.AUTO 将选择权交给 Tensorflow。您可以通过以下方式指定它们:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)

与多 GPU 训练相比,多工作进程训练的一个主要差异是多工作进程的设置。'TF_CONFIG' 环境变量是在 TensorFlow 中为作为集群一部分的每个工作进程指定集群配置的标准方式。请在本文档的设置 TF_CONFIG 部分了解详细信息。

有关 MultiWorkerMirroredStrategy 的详细信息,请参阅以下教程:

ParameterServerStrategy#

参数服务器训练是一种常见的数据并行方法,用于在多台机器上扩展模型训练。参数服务器训练集群由工作进程和参数服务器组成。变量在参数服务器上创建,并在每个步骤中由工作进程读取和更新。请查阅参数服务器培训教程以了解详情。

在 TensorFlow 2 中,参数服务器训练通过 tf.distribute.experimental.coordinator.Cluster Coordinator 类使用基于中央协调器的架构。

在此实现中,workerparameter server 任务运行侦听来自协调器的任务的 tf.distribute.Server。协调器创建资源、调度训练任务、编写检查点并处理任务失败。

在协调器上运行的编程中,您将使用 ParameterServerStrategy 对象定义训练步骤,并使用 ClusterCoordinator 将训练步骤分派给远程工作进程。这是创建它们的最简单方式:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

要详细了解 ParameterServerStrategy,请参阅使用 Keras Model.fit 和自定义训练循环进行参数服务器训练教程。

注:如果使用 TFConfigClusterResolver,则需要配置 'TF_CONFIG' 环境变量。它类似于 MultiWorkerMirroredStrategy 中的 'TF_CONFIG',但具有额外的注意事项。

在 TensorFlow 1 中,ParameterServerStrategy只能通过 tf.compat.v1.distribute.experimental.ParameterServerStrategy 符号在 Estimator 中使用。

注:此策略是 experimental,因为它目前正在进行积极开发。

CentralStorageStrategy#

tf.distribute.experimental.CentralStorageStrategy 也执行同步训练。变量不会被镜像,而是放在 CPU 上,且运算会复制到所有本地 GPU 。如果只有一个 GPU,则所有变量和运算都将被放在该 GPU 上。

请通过以下代码创建 CentralStorageStrategy 实例:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

这会创建一个 CentralStorageStrategy 实例,该实例将使用所有可见的 GPU 和 CPU。在副本上对变量的更新将先进行聚合,然后再应用于变量。

注:此策略是 experimental,因为它目前正在进行开发。

其他策略#

除上述策略外,还有其他两种策略可能对使用 tf.distribute API 进行原型设计和调试有所帮助。

默认策略#

默认策略是一种分布策略,当作用域内没有显式分布策略时就会出现。此策略会实现 tf.distribute.Strategy 接口,但只具有传递功能,不提供实际分布。例如,Strategy.run(fn) 只会调用 fn。使用该策略编写的代码与未使用任何策略编写的代码完全一样。您可以将其视为“无运算”策略。

默认策略是一种单一实例,无法创建它的更多实例。可以在任何显式策略范围之外使用 tf.distribute.get_strategy 来获取它(可用于在显式策略范围内获取当前策略的相同 API)。

default_strategy = tf.distribute.get_strategy()

此策略有两个主要用途:

  • 它允许无条件地编写可感知分布的库代码。例如,在 tf.keras.optimizers 中,您可以使用 tf.distribute.get_strategy,并用此策略来降低梯度 – 它将始终返回一个策略对象,您可以在该对象上调用 Strategy.reduce API。

# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • 与库代码类似,它可用于编写最终用户的程序以便使用或不使用分布策略,而无需条件逻辑。下面是一个说明了这一点的示例代码段:

if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>,
  1: <tf.Variable 'Variable/replica_1:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceStrategy#

tf.distribute.OneDeviceStrategy 是一种会将所有变量和计算放在单个指定设备上的策略。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

此策略在许多方面与默认策略不同。在默认策略中,与在未使用任何分布策略的情况下运行 TensorFlow 相比,变量布局逻辑保持不变。但是,在使用 OneDeviceStrategy 时,在其范围内创建的所有变量都会显式放置在指定的设备上。此外,通过 OneDeviceStrategy.run 调用的任何函数也将放置在指定的设备上。

通过此策略分布的输入将被预获取到指定设备。在默认策略中,没有输入分布。

与默认策略类似,在切换到实际分布到多个设备/机器的其他策略之前,也可以使用此策略来测试代码。这将比默认策略更多地使用分布策略机制,但不能像使用 MirroredStrategyTPUStrategy 等策略那样充分发挥其作用。如果您想让代码表现地像没有策略,请使用默认策略。

到目前为止,您已经看到了不同策略以及如何将它们实例化。接下来的几个部分将介绍可以使用它们来分布训练的不同方式。

在 Keras Model.fit 中使用 tf.distribute.Strategy#

tf.distribute.Strategy 已集成到 tf.keras 中,后者是 TensorFlow 对 Keras API 规范的实现。tf.keras 是用于构建和训练模型的高级 API。通过集成到 tf.keras 后端,您可以无缝使用 Model.fit 来分布以 Keras 训练框架编写的训练。

您需要对代码进行以下更改:

  1. 创建一个合适的 tf.distribute.Strategy 实例。

  2. 将 Keras 模型、优化器和指标的创建移到 strategy.scope 中。因此,模型的 call()train_step()test_step() 方法中的代码都将在加速器上分布和执行。

TensorFlow 分布策略支持所有类型的 Keras 模型 - 序贯函数式子类化

下面是一段代码,执行该代码会创建一个非常简单的带有一个 Dense 层的 Keras 模型:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(1, input_shape=(1,),
                            kernel_regularizer=tf.keras.regularizers.L2(1e-4))])
  model.compile(loss='mse', optimizer='sgd')
/media/pc/data/lxw/envs/anaconda3x/envs/xxx/lib/python3.12/site-packages/keras/src/layers/core/dense.py:87: UserWarning: Do not pass an `input_shape`/`input_dim` argument to a layer. When using Sequential models, prefer using an `Input(shape)` object as the first layer in the model instead.
  super().__init__(activity_regularizer=activity_regularizer, **kwargs)

此示例使用 MirroredStrategy,因此您可以在具有多个 GPU 的计算机上运行。strategy.scope() 指示 Keras 使用哪种策略来分布训练。通过在此作用域内创建模型/优化器/指标,您可以创建分布式变量而不是常规变量。设置完成后,您可以像往常一样拟合模型。MirroredStrategy 负责在可用 GPU 上复制模型的训练、聚合梯度等。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
10/10 ━━━━━━━━━━━━━━━━━━━━ 2s 5ms/step - loss: 0.2835
Epoch 2/2
10/10 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.0536  
10/10 ━━━━━━━━━━━━━━━━━━━━ 1s 4ms/step - loss: 0.0143
0.014308060519397259

我们在这里使用了 tf.data.Dataset 来提供训练和评估输入。您还可以使用 Numpy 数组:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.0103  
Epoch 2/2
10/10 ━━━━━━━━━━━━━━━━━━━━ 0s 2ms/step - loss: 0.0021
<keras.src.callbacks.history.History at 0x7f189c188bc0>

在上述两个示例(Dataset 或 Numpy)中,给定输入的每个批次都被平均分到了多个副本中。例如,如果对 2 个 GPU 使用 MirroredStrategy,大小为 10 的每个批次将被均分到 2 个 GPU,每个 GPU 会在每步接收 5 个输入样本。如果添加更多 GPU,则每个周期的训练速度会更快。通常,您希望在添加更多加速器时增加批次大小,以便有效利用额外的计算能力。您还需要根据模型重新调整您的学习率。您可以使用 strategy.num_replicas_in_sync 获得副本数量。

mirrored_strategy.num_replicas_in_sync
2
# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15, 20:0.175}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

目前支持的策略#

训练 API

MirroredStrategy

TPUStrategy

MultiWorkerMirroredStrategy

ParameterServerStrategy

CentralStorageStrategy

Keras Model.fit

支持

支持

支持

实验性支持

实验性支持

示例和教程#

下面列出了说明上述与 Keras Model.fit 端到端集成的教程和示例:

  1. 教程:使用 Model.fitMirroredStrategy 进行训练。

  2. 教程:使用 Model.fitMultiWorkerMirroredStrategy 进行训练。

  3. 指南:包含使用Model.fitTPUStrategy 的示例。

  4. 教程:使用Model.fitParameterServerStrategy 进行参数服务器训练。

  5. 教程:使用 Model.fitTPUStrategy 微调 GLUE 基准测试中的许多任务的 BERT。

  6. 包含使用各种策略实现的最先进模型集合的 TensorFlow Model Garden 仓库

在自定义训练循环中使用 tf.distribute.Strategy#

如上所述,在 Keras Model.fit 中使用 tf.distribute.Strategy 只需改动几行代码。再多花点功夫,您还可以在自定义训练循环中使用 tf.distribute.Strategy

如果您需要更多相对于使用 Estimator 或 Keras 时的灵活性和对训练循环的控制权,您可以编写自定义训练循环。例如,在使用 GAN 时,您可能会希望每轮使用不同数量的生成器或判别器步骤。同样,高级框架也不太适合强化学习训练。

tf.distribute.Strategy 类提供了一组核心方法来支持自定义训练循环。使用这些方法时,最初可能需要对代码进行少量重构,但是一旦完成,您便能够通过更改策略实例在 GPU、TPU 和多台计算机之间切换。

下面是用来说明此用例的一个简短的代码段,其中的简单训练样本使用与之前相同的 Keras 模型。

首先,在该策略的作用域内创建模型和优化器。这样可以确保使用模型和优化器创建的任何变量都是镜像变量。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(1, input_shape=(1,),
                            kernel_regularizer=tf.keras.regularizers.L2(1e-4))])
  optimizer = tf.keras.optimizers.SGD()

接下来,创建输入数据集并调用 tf.distribute.Strategy.experimental_distribute_dataset 以根据策略分布数据集。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

然后,定义一个训练步骤。使用 tf.GradientTape 计算梯度并使用优化器应用这些梯度来更新模型的变量。要分布此训练步骤,将其放入函数 train_step 中,然后将其与您从之前创建的 dist_dataset 中获得的数据集输入一起传递给 tf.distribute.Strategy.run

# Sets `reduction=NONE` to leave it to tf.nn.compute_average_loss() below.
loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    per_example_loss = loss_object(labels, predictions)
    loss = tf.nn.compute_average_loss(per_example_loss)
    model_losses = model.losses
    if model_losses:
      loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

以上代码还需注意以下几点:

  1. 您使用了 tf.nn.compute_average_loss 将每个样本的预测损失减少到标量。tf.nn.compute_average_loss 将每个样本的损失相加,然后将总和除以全局批次大小。这很重要,因为稍后在每个副本上计算出梯度后,会通过对它们求和使其在副本中聚合。

默认情况下,全局批次大小为 tf.get_strategy().num_replicas_in_sync * tf.shape(per_example_loss)[0]。也可以将其显式指定为关键字参数 global_batch_size=。如果没有短批次,默认值相当于 tf.nn.compute_average_loss(..., global_batch_size=global_batch_size) 和上面定义的 global_batch_size。(有关短批次以及如何避免或处理它们的更多信息,请参阅自定义训练教程。)

  1. 您还使用 tf.nn.scale_regularization_loss 将通过 Model 对象注册的正则化损失(如果有)缩放 1/num_replicas_in_sync。对于那些依赖于输入的正则化损失,它取决于建模代码,而不是自定义训练循环,来对每个副本(!)批次大小求平均值;这样,建模代码就可以保持与复制无关,而训练循环仍然与如何计算正则化损失无关。

  2. 当您在一个分布策略作用域内调用 apply_gradients 时,它的行为会被修改。具体来说,同步训练期间,在将梯度应用于每个并行实例之前,它会对梯度的所有副本求和。

  3. 您还使用了 tf.distribute.Strategy.reduce API 来聚合 tf.distribute.Strategy.run 返回的结果以进行报告。tf.distribute.Strategy.run 会从策略中的每个本地副本返回结果,您可以通过多种方式使用此结果。可以 reduce 它们以获得聚合值。还可以通过执行 tf.distribute.Strategy.experimental_local_results 获得包含在结果中的值的列表,每个本地副本一个列表。

最后,当我们定义完训练步骤后,就可以迭代 dist_dataset,并在循环中运行训练:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.39740226, shape=(), dtype=float32)
tf.Tensor(0.3952575, shape=(), dtype=float32)
tf.Tensor(0.39313167, shape=(), dtype=float32)
tf.Tensor(0.39102438, shape=(), dtype=float32)
tf.Tensor(0.38893557, shape=(), dtype=float32)
tf.Tensor(0.38686493, shape=(), dtype=float32)
tf.Tensor(0.38481238, shape=(), dtype=float32)
tf.Tensor(0.38277763, shape=(), dtype=float32)
tf.Tensor(0.3807605, shape=(), dtype=float32)
tf.Tensor(0.37876078, shape=(), dtype=float32)
tf.Tensor(0.37677836, shape=(), dtype=float32)
tf.Tensor(0.37481293, shape=(), dtype=float32)
tf.Tensor(0.37286443, shape=(), dtype=float32)
tf.Tensor(0.37093258, shape=(), dtype=float32)
tf.Tensor(0.36901724, shape=(), dtype=float32)
tf.Tensor(0.3671182, shape=(), dtype=float32)
tf.Tensor(0.36523533, shape=(), dtype=float32)
tf.Tensor(0.36336842, shape=(), dtype=float32)
tf.Tensor(0.36151734, shape=(), dtype=float32)
tf.Tensor(0.35968187, shape=(), dtype=float32)
tf.Tensor(0.35786188, shape=(), dtype=float32)
tf.Tensor(0.35605708, shape=(), dtype=float32)
tf.Tensor(0.3542675, shape=(), dtype=float32)
tf.Tensor(0.3524929, shape=(), dtype=float32)
tf.Tensor(0.35073304, shape=(), dtype=float32)
tf.Tensor(0.34898782, shape=(), dtype=float32)
tf.Tensor(0.34725717, shape=(), dtype=float32)
tf.Tensor(0.3455408, shape=(), dtype=float32)
tf.Tensor(0.34383863, shape=(), dtype=float32)
tf.Tensor(0.34215042, shape=(), dtype=float32)
tf.Tensor(0.34047616, shape=(), dtype=float32)
tf.Tensor(0.3388156, shape=(), dtype=float32)
tf.Tensor(0.33716866, shape=(), dtype=float32)
tf.Tensor(0.3355351, shape=(), dtype=float32)
tf.Tensor(0.33391494, shape=(), dtype=float32)
tf.Tensor(0.33230788, shape=(), dtype=float32)
tf.Tensor(0.33071378, shape=(), dtype=float32)
tf.Tensor(0.32913265, shape=(), dtype=float32)
tf.Tensor(0.32756418, shape=(), dtype=float32)
tf.Tensor(0.32600844, shape=(), dtype=float32)
tf.Tensor(0.32446513, shape=(), dtype=float32)
tf.Tensor(0.32293412, shape=(), dtype=float32)
tf.Tensor(0.32141536, shape=(), dtype=float32)
tf.Tensor(0.31990868, shape=(), dtype=float32)
tf.Tensor(0.318414, shape=(), dtype=float32)
tf.Tensor(0.3169312, shape=(), dtype=float32)
tf.Tensor(0.31546003, shape=(), dtype=float32)
tf.Tensor(0.31400055, shape=(), dtype=float32)
tf.Tensor(0.31255245, shape=(), dtype=float32)
tf.Tensor(0.3111158, shape=(), dtype=float32)
tf.Tensor(0.30969033, shape=(), dtype=float32)
tf.Tensor(0.30827606, shape=(), dtype=float32)
tf.Tensor(0.30687276, shape=(), dtype=float32)
tf.Tensor(0.30548036, shape=(), dtype=float32)
tf.Tensor(0.30409873, shape=(), dtype=float32)
tf.Tensor(0.30272776, shape=(), dtype=float32)
tf.Tensor(0.3013674, shape=(), dtype=float32)
tf.Tensor(0.3000175, shape=(), dtype=float32)
tf.Tensor(0.29867795, shape=(), dtype=float32)
tf.Tensor(0.29734865, shape=(), dtype=float32)
tf.Tensor(0.2960295, shape=(), dtype=float32)
tf.Tensor(0.2947204, shape=(), dtype=float32)
tf.Tensor(0.29342124, shape=(), dtype=float32)
tf.Tensor(0.2921319, shape=(), dtype=float32)
tf.Tensor(0.29085237, shape=(), dtype=float32)
tf.Tensor(0.28958243, shape=(), dtype=float32)
tf.Tensor(0.28832203, shape=(), dtype=float32)
tf.Tensor(0.2870711, shape=(), dtype=float32)
tf.Tensor(0.28582957, shape=(), dtype=float32)
tf.Tensor(0.28459728, shape=(), dtype=float32)
tf.Tensor(0.2833741, shape=(), dtype=float32)
tf.Tensor(0.28216007, shape=(), dtype=float32)
tf.Tensor(0.28095505, shape=(), dtype=float32)
tf.Tensor(0.27975893, shape=(), dtype=float32)
tf.Tensor(0.27857158, shape=(), dtype=float32)
tf.Tensor(0.277393, shape=(), dtype=float32)
tf.Tensor(0.276223, shape=(), dtype=float32)
tf.Tensor(0.27506164, shape=(), dtype=float32)
tf.Tensor(0.2739087, shape=(), dtype=float32)
tf.Tensor(0.27276418, shape=(), dtype=float32)
tf.Tensor(0.27162796, shape=(), dtype=float32)
tf.Tensor(0.2705, shape=(), dtype=float32)
tf.Tensor(0.26938015, shape=(), dtype=float32)
tf.Tensor(0.26826838, shape=(), dtype=float32)
tf.Tensor(0.26716462, shape=(), dtype=float32)
tf.Tensor(0.26606876, shape=(), dtype=float32)
tf.Tensor(0.26498076, shape=(), dtype=float32)
tf.Tensor(0.26390052, shape=(), dtype=float32)
tf.Tensor(0.26282793, shape=(), dtype=float32)
tf.Tensor(0.26176298, shape=(), dtype=float32)
tf.Tensor(0.26070562, shape=(), dtype=float32)
tf.Tensor(0.25965565, shape=(), dtype=float32)
tf.Tensor(0.25861314, shape=(), dtype=float32)
tf.Tensor(0.2575779, shape=(), dtype=float32)
tf.Tensor(0.25654998, shape=(), dtype=float32)
tf.Tensor(0.25552922, shape=(), dtype=float32)
tf.Tensor(0.25451562, shape=(), dtype=float32)
tf.Tensor(0.253509, shape=(), dtype=float32)
tf.Tensor(0.25250944, shape=(), dtype=float32)
tf.Tensor(0.2515168, shape=(), dtype=float32)

在上面的示例中,我们通过迭代 dist_dataset 为训练提供输入。我们还提供 tf.distribute.Strategy.make_experimental_numpy_dataset 以支持 Numpy 输入。您可以在调用 tf.distribute.Strategy.experimental_distribute_dataset 之前使用此 API 来创建数据集。

迭代数据的另一种方式是显式地使用迭代器。当您希望运行给定数量的步骤而非迭代整个数据集时,可能会用到此方式。现在可以将上面的迭代修改为:先创建迭代器,然后在迭代器上显式地调用 next 以获得输入数据。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.25053102, shape=(), dtype=float32)
tf.Tensor(0.24955198, shape=(), dtype=float32)
tf.Tensor(0.24857968, shape=(), dtype=float32)
tf.Tensor(0.2476141, shape=(), dtype=float32)
tf.Tensor(0.24665508, shape=(), dtype=float32)
tf.Tensor(0.24570267, shape=(), dtype=float32)
tf.Tensor(0.24475667, shape=(), dtype=float32)
tf.Tensor(0.24381712, shape=(), dtype=float32)
tf.Tensor(0.24288395, shape=(), dtype=float32)
tf.Tensor(0.24195705, shape=(), dtype=float32)

这涵盖了使用 tf.distribute.Strategy API 分布自定义训练循环的最简单情况。

目前支持的策略#

训练 API

MirroredStrategy

TPUStrategy

MultiWorkerMirroredStrategy

ParameterServerStrategy

CentralStorageStrategy

自定义训练循环

支持

支持

支持

实验性支持

实验性支持

示例和教程#

以下是一些利用自定义训练循环使用分布策略的示例:

  1. 教程:使用自定义训练循环和 MirroredStrategy 进行训练。

  2. 教程:使用自定义训练循环和 MultiWorkerMirroredStrategy 进行训练。

  3. 指南:包含使用 TPUStrategy 的自定义训练循环的示例。

  4. 教程:使用自定义训练循环和 ParameterServerStrategy 进行参数服务器训练。

  5. 包含使用各种策略实现的最先进模型集合的 TensorFlow Model Garden 仓库

其他主题#

本部分涵盖与多个用例相关的一些主题。

设置 TF_CONFIG 环境变量#

对于多工作进程训练,如前所述,您需要为集群中运行的每个二进制文件设置 'TF_CONFIG' 环境变量。'TF_CONFIG' 环境变量是一个 JSON 字符串,它指定了哪些任务构成集群、任务的地址以及每个任务在集群中的角色。tensorflow/ecosystem 仓库提供了一个 Kubernetes 模板,该模板会为您的训练任务设置 'TF_CONFIG'

'TF_CONFIG' 有两个组件:集群和任务。

  • 集群提供有关训练集群的信息,后者是一个由不同类型的作业(例如工作进程)组成的字典。在多工作进程训练中,除了常规工作进程执行的作业之外,通常会有一个工作进程承担更多职责,例如保存检查点和为 TensorBoard 编写摘要文件。这种工作进程被称为“首席”工作进程,习惯上将索引为 0 的工作进程指定为首席工作进程(实际上这就是 tf.distribute.Strategy 的实现方式)。

  • 另一方面,任务提供有关当前任务的信息。第一个组件集群对所有工作进程都相同,第二个组件任务在每个工作进程上都不同,并指定该工作进程的类型和索引。

'TF_CONFIG' 的示例如下:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

'TF_CONFIG' 指定 "cluster" 中有三个工作进程和两个 "ps" 任务以及它们的主机和端口。"task" 部分指定当前任务在 "cluster" 中的角色:工作进程 1(第二个工作进程)。集群中的有效角色是 "chief""worker""ps""evaluator"。除了使用 tf.distribute.experimental.ParameterServerStrategy 时,不应当存在 "ps" 作业。

后续步骤#

我们正在积极开发 tf.distribute.Strategy。欢迎试用,并通过 GitHub 议题提供反馈。