##### Copyright 2019 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

使用 tf.distribute.Strategy 进行自定义训练#

在 TensorFlow.org 上查看 在 Google Colab 上运行 在 GitHub 上查看源代码 下载该 notebook

本教程演示了如何使用具有自定义训练循环的 TensorFlow API tf.distribute.Strategy,它提供了一种用于在多个处理单元(GPU、多台机器或 TPU)之间分配训练的抽象。在此示例中,将在 Fashion MNIST 数据集上训练一个简单的卷积神经网络,此数据集包含 70,000 个大小为 28 x 28 的图像。

自定义训练循环提供了灵活性并且能够更好地控制训练。此外,它们也让调试模型和训练循环更加容易。

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)

下载 Fashion MNIST 数据集#

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Add a dimension to the array -> new shape == (28, 28, 1)
# This is done because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Scale the images to the [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

创建分布变量和计算图的策略#

tf.distribute.MirroredStrategy 策略是如何运作的?

  • 所有变量和模型计算图都会在副本之间复制。

  • 输入会均匀分布在副本中。

  • 每个副本会计算收到的输入的损失和梯度。

  • 通过对梯度求和,可以在所有副本之间同步梯度。

  • 同步后,会对每个副本上的变量副本进行相同的更新。

注:您可以将下面的所有代码放在单个作用域内。出于说明目的,本示例将它分为几个代码单元。

# If the list of devices is not specified in
# `tf.distribute.MirroredStrategy` constructor, they will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

设置输入流水线#

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

创建并分布数据集:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

创建模型#

使用 tf.keras.Sequential 创建模型。也可以使用模型子类化 API函数式 API 来完成此操作。

def create_model():
  regularizer = tf.keras.regularizers.L2(1e-5)
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3,
                             activation='relu',
                             kernel_regularizer=regularizer),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(64, 3,
                             activation='relu',
                             kernel_regularizer=regularizer),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64,
                            activation='relu',
                            kernel_regularizer=regularizer),
      tf.keras.layers.Dense(10, kernel_regularizer=regularizer)
    ])

  return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

定义损失函数#

回想一下,损失函数由一个或两个部分组成:

  • 预测损失衡量模型的预测与一批训练样本的训练标签的偏差程度。它针对每个带标签的样本进行计算,然后通过计算平均值在整个批次中实现缩减。

  • 或者,可以将正则化损失项添加到预测损失中,以引导模型避免过拟合训练数据。常见的选择是 L2 正则化,它会添加所有模型权重平方和的小固定倍数,与样本数量无关。上面的模型使用 L2 正则化来演示其在下面的训练循环中的处理。

对于在具有单个 GPU/CPU 的单个计算机上进行的训练,运作方式如下:

  • 计算批次中每个样本的预测损失,在批次中进行求和,然后除以批次大小。

  • 将正则化损失添加到预测损失中。

  • 总损失的梯度是相对于每个模型权重计算的,优化器会根据相应的梯度更新每个模型权重。

使用 tf.distribute.Strategy,可在副本之间分割输入批次。例如,假设您有 4 个 GPU,每个 GPU 都有一个模型副本。一个包含 256 个输入样本的批次均匀分布在 4 个副本中,因此每个副本都会获得一个大小为 64 的批次:我们有 256 = 4*64,或者一般来说 GLOBAL_BATCH_SIZE = num_replicas_in_sync * BATCH_SIZE_PER_REPLICA

每个副本会根据它获得的训练样本计算损失,并计算损失相对于每个模型权重的梯度。优化器会确保在使用这些梯度来更新每个副本上的模型权重拷贝之前,在副本之间对这些梯度求和

那么,使用 tf.distribute.Strategy 时应如何计算损失?

  • 每个副本会计算分配给它的所有样本的预测损失,将结果相加,然后除以 num_replicas_in_sync * BATCH_SIZE_PER_REPLICA,或等效地除以 GLOBAL_BATCH_SIZE

  • 每个副本会计算正则化损失并将其除以 num_replicas_in_sync

与非分布式训练相比,所有按副本损失项都会按系数 1/num_replicas_in_sync 成比例缩小。另一方面,在优化器应用它们之前,所有损失项(或者更确切地说,它们的梯度)都会在该数量的副本上求和。实际上,每个副本上的优化器都会使用相同的梯度,就好像发生了 GLOBAL_BATCH_SIZE 的非分布式计算一样。这与 Keras Model.fit 的分布式和非分布式行为一致。要了解更大的全局批次大小如何能够提高学习率,请参阅使用 Keras 进行分布式训练教程。

如何在 TensorFlow 中执行此操作?

  • 损失缩减和缩放在 Keras {code 0}Model.compile{/code 0} 和 {code 1}Model.fit{/code 1} 中自动完成

  • 如果您正在编写自定义训练循环(如本教程中所述),则应使用 tf.nn.compute_average_loss 将每个样本的损失相加,然后将总和除以全局批次大小,该函数会将每个样本的损失、可选样本权重作为参数,并返回经过缩放的损失。

  • 如果使用 tf.keras.losses 类(如下例所示),则需要将损失缩减显式指定为 NONESUM 之一。不允许在 Model.fit 以外使用默认的 AUTOSUM_OVER_BATCH_SIZE

    • 不允许使用 AUTO,因为用户应该显式考虑他们想要哪种缩减以确保它在分布式情况下正确。

    • 不允许使用 SUM_OVER_BATCH_SIZE,因为目前它只会除以每个副本批次大小,并将副本的除数留给用户,这可能很容易错过。因此,您需要自行显式进行缩减。

  • 如果您正在为具有 Model.losses 非空列表的模型编写自定义训练循环(例如,权重正则化器),应将它们相加并将总和除以副本数。您可以使用 tf.nn.scale_regularization_loss 函数执行此操作。模型代码本身并不知道副本的数量。

但是,模型可以使用 Keras API 定义依赖于输入的正则化损失,例如 Layer.add_loss(...)Layer(activity_regularizer=...)。对于 Layer.add_loss(...),建模代码需要将每个样本项的总和除以按副本(!)的批次大小,例如使用 tf.math.reduce_mean()

with strategy.scope():
  # Set reduction to `NONE` so you can do the reduction yourself.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions, model_losses):
    per_example_loss = loss_object(labels, predictions)
    loss = tf.nn.compute_average_loss(per_example_loss)
    if model_losses:
      loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
    return loss

特殊情况#

高级用户还应考虑以下特殊情况。

  • 短于 GLOBAL_BATCH_SIZE 的输入批次会在多个地方产生不符合要求的极端情况。在实践中,通常最好通过使用 Dataset.repeat().batch() 允许批次跨越周期边界并通过步数而不是数据集结束定义近似周期来避免此类情况。或者,Dataset.batch(drop_remainder=True) 保留周期的概念,但会删除最后几个样本。

为便于说明,此示例采用了更困难的路线并允许短批次,以便每个训练周期仅包含每个训练样本一次。

tf.nn.compute_average_loss() 应使用哪个分母?

* By default, in the example code above and equivalently in `Keras.fit()`, the sum of prediction losses is divided by `num_replicas_in_sync` times the actual batch size seen on the replica (with empty batches silently ignored). This preserves the balance between the prediction loss on the one hand and the regularization losses on the other hand. It is particularly appropriate for models that use input-dependent regularization losses. Plain L2 regularization just superimposes weight decay onto the gradients of the prediction loss and is less in need of such a balance.
* In practice, many custom training loops pass as a constant Python value into `tf.nn.compute_average_loss(..., global_batch_size=GLOBAL_BATCH_SIZE)` to use it as the denominator. This preserves the relative weighting of training examples between batches. Without it, the smaller denominator in short batches effectively upweights the examples in those. (Before TensorFlow 2.13, this was also needed to avoid NaNs in case some replica received an actual batch size of zero.)

如果按上面所述避免短批次,则这两个选项等效。

  • 多维 labels 要求您对每个样本中预测数量的 per_example_loss 求平均值。考虑对输入图像的所有像素进行分类任务,其中 predictions 的形状为 (batch_size, H, W, n_classes),而 labels 的形状为 (batch_size, H, W)。您需要更新 per_example_loss,例如:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

小心:验证损失的形状tf.losses/tf.keras.losses 中的损失函数通常会返回输入最后一个维度的平均值。损失类封装这些函数。在创建损失类的实例时传递 reduction=Reduction.NONE,表示“无额外归约”。对于样本输入形状为 [batch, W, H, n_classes] 的类别损失,会缩减 n_classes 维度。对于类似 losses.mean_squared_errorlosses.binary_crossentropy 的逐点损失,应包含一个虚拟轴,使 [batch, W, H, 1] 缩减为 [batch, W, H]。如果没有虚拟轴, [batch, W, H] 将被错误地缩减为 [batch, W]

定义跟踪损失和准确率的指标#

这些指标可以跟踪测试损失,以及训练和测试的准确率。您可以使用 .result() 随时获取累积的统计信息。

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')

训练循环#

# A model, an optimizer, and a checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions, model.losses)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print(template.format(epoch + 1, train_loss,
                         train_accuracy.result() * 100, test_loss.result(),
                         test_accuracy.result() * 100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()

上述示例中需要注意的事项#

  • 使用 for x in ... 构造来迭代 train_dist_datasettest_dist_dataset

  • 经过缩放的损失是 distributed_train_step 的返回值。这个值会使用 tf.distribute.Strategy.reduce 调用跨副本聚合,然后通过对 tf.distribute.Strategy.reduce 调用的返回值求和来跨批次聚合。

  • tf.keras.Metrics 应该在由 tf.distribute.Strategy.run 执行的 train_steptest_step 内更新。

  • tf.distribute.Strategy.run 会从策略中的每个本地副本返回结果,您可以通过多种方式使用此结果。可以对它们执行 tf.distribute.Strategy.reduce 以获得聚合值。还可以通过执行 tf.distribute.Strategy.experimental_local_results 获得包含在结果中的值的列表,每个本地副本一个列表。

恢复最新的检查点并进行测试#

使用 tf.distribute.Strategy 设置了检查点的模型可以使用或不使用策略进行恢复。

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result() * 100))

迭代数据集的其他方式#

使用迭代器#

如果要迭代给定的步数而不是遍历整个数据集,可以使用 iter 调用创建一个迭代器,并在该迭代器上显式地调用 next。您可以选择在 tf.function 内部和外部迭代数据集。下面是一个小代码段,演示了使用迭代器在 tf.function 外部迭代数据集。

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print(template.format(epoch + 1, average_train_loss, train_accuracy.result() * 100))
  train_accuracy.reset_states()

在 tf.function 内部迭代#

您还可以使用 for x in ... 构造在 tf.function 内部迭代整个输入 train_dist_dataset,或者像上面那样创建迭代器。下面的示例演示了使用 @tf.function 装饰器封装一个训练周期并在函数内部迭代 train_dist_dataset

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100))

  train_accuracy.reset_states()

跨副本跟踪训练损失#

注:通常情况下,您应该使用 tf.keras.Metrics 来跟踪每个样本的值,并避免已在副本中聚合的值。

由于执行的损失缩放计算,不建议使用 tf.keras.metrics.Mean 来跟踪不同副本的训练损失。

例如,如果您运行具有以下特点的训练作业:

  • 两个副本

  • 在每个副本上处理两个样本

  • 产生的损失值:每个副本上为 [2, 3] 和 [4, 5]

  • 全局批次大小 = 4

通过损失缩放,您可以通过添加损失值来计算每个副本上每个样本的损失值,然后除以全局批次大小。在这种情况下:(2 + 3) / 4 = 1.25,且 (4 + 5) / 4 = 2.25

如果使用 tf.keras.metrics.Mean 来跟踪两个副本的损失,结果会有所不同。在此示例中,您最终会得到一个 total 为 3.50 和 count 为 2 的结果,在指标上调用 result() 时,您将得到 total/count = 1.75。使用 tf.keras.Metrics 计算的损失将按等于同步副本数的附加因子进行缩放。

指南和示例#

以下是一些利用自定义训练循环使用分布策略的示例:

  1. 分布式训练指南

  2. 使用 MirroredStrategyDenseNet 示例。

  3. 使用 MirroredStrategyTPUStrategy 训练的 BERT 示例。此示例对于理解如何在分布式训练等过程中从检查点加载并生成定期检查点特别有帮助。

  4. 使用 MirroredStrategy(可用 keras_use_ctl 标记启用)训练的 NCF 示例。

  5. NMT 使用 MirroredStrategy来训练的例子。

可以在分布策略指南示例和教程下找到更多示例。

后续步骤#