##### Copyright 2019 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

利用 Estimator 进行多工作进程训练#

在 TensorFlow.org 上查看 在 Google Colab 运行 在 Github 上查看源代码 下载笔记本

警告:不建议将 Estimator 用于新代码。Estimator 运行 v1.Session 风格的代码,此类代码更加难以正确编写,并且可能会出现意外行为,尤其是与 TF 2 代码结合使用时。Estimator 确实在我们的兼容性保证范围内,但除了安全漏洞之外不会得到任何修复。请参阅迁移指南以了解详情。

概述#

注:虽然您可以将 Estimator 与 tf.distribute API 结合使用,但建议将 Keras 和 tf.distribute 结合使用,请参阅利用 Keras 进行多工作进程训练。使用 tf.distribute.Strategy 的 Estimator 训练支持有限。

本教程展示了如何通过 tf.estimatortf.distribute.Strategy 用于分布式多工作进程训练。如果您使用 tf.estimator 编写代码,并且希望以高性能扩展到更多机器,那么本教程很适合您。

在开始之前,请先阅读分布式策略指南。同样相关的还有多 GPU 训练教程,因为本教程使用的是相同的模型。

创建#

首先,设置好 TensorFlow 以及将会用到的输入模块。

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json

注:从 TF2.4 开始,如果在启用了 Eager(默认)的情况下运行,多工作进程镜像策略会在使用 Estimator 时失败。TF2.4 中的错误是 TypeError: cannot pickle '_thread.lock' object。请参阅议题 #46556 了解详细信息。解决办法是禁用 Eager Execution。

tf.compat.v1.disable_eager_execution()

输入函数#

本教程使用的是 TensorFlow 数据集中的 MNIST 数据集。本教程中的代码与多 GPU 训练教程类似,但有一个主要区别:当使用 Estimator 进行多工作进程训练时,需要根据工作进程的数量对数据集进行拆分,以确保模型收敛。输入数据会根据工作进程索引来拆分,因此每个工作进程负责处理数据集的 1/num_workers 个不同部分。

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

使模型收敛的另一种合理方式是在每个工作器上设置不同的随机种子,然后对数据集进行随机重排。

多工作器配置#

本教程主要的不同(区别于使用多 GPU 训练教程)在于多工作器的创建。明确集群中每个工作器的配置的标准方式是设置环境变量 TF_CONFIG

TF_CONFIG 里包括了两个部分:clustertaskcluster 提供了关于整个集群的信息,也就是集群中的工作器和参数服务器(parameter server)。task 提供了关于当前任务的信息。在本例中,任务的类型(type)是 worker 且该任务的索引(index)是 0。

出于演示的目的,本教程展示了怎么将 TF_CONFIG 设置成两个本地的工作器。在实践中,你可以在外部的IP地址和端口上创建多个工作器,并为每个工作器正确地配置好 TF_CONFIG 变量,也就是更改任务的索引。

警告:请勿在 Colab 中执行以下代码。TensorFlow 的运行时将尝试在指定的 IP 地址和端口上创建 gRPC 服务器,而这可能会失败。请参阅本教程的 Keras 版本,查看说明如何在单台计算机上测试运行多个工作进程的示例。

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

定义模型#

定义训练中用到的层,优化器和损失函数。本教程使用 Keras layers 定义模型,同使用多 GPU 训练教程类似。

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

注:尽管在本例中学习率是固定的,但通常情况下可能有必要根据全局批次大小对学习率进行调整。

MultiWorkerMirroredStrategy#

要训练模型,请使用 tf.distribute.experimental.MultiWorkerMirroredStrategy 的实例。MultiWorkerMirroredStrategy会在所有工作进程的每个设备上的模型的层中创建所有变量的副本。它使用 CollectiveOps(一种用于集合通信的 TensorFlow 运算)来聚合梯度并确保变量同步。tf.distribute.Strategy 指南中有关于此策略的更多详细信息。

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

训练和评估模型#

接下来,在 RunConfig 中为 Estimator 指定分布式策略,并通过调用 tf.estimator.train_and_evaluate 进行训练和评估。本教程通过 train_distribute 指定策略来分布训练。也可以通过 eval_distribute 来分布评估。

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)

优化训练性能#

您现在有了一个模型和一个由 tf.distribute.Strategy 驱动的支持多工作进程的 Estimator。您可以尝试使用以下技巧来优化多工作进程训练的性能:

  • 增加批次大小:这里指定的批次大小是按 GPU 计算的。通常,建议使用适合 GPU 内存的最大批次大小。

  • 强制转换变量:尽可能将变量强制转换为 tf.float。官方 ResNet 模型包括一个说明如何实现的示例

  • 使用集合通信MultiWorkerMirroredStrategy 提供了多个集合通信实现

    • RING 使用 gRPC 作为跨主机通信层实现了基于环的集合。

    • NCCL 使用 NVIDIA 的 NCCL 来实现集合。

    • AUTO 将选择推迟到运行时。

    集合实现的最佳选择取决于 GPU 的数量和种类,以及集群中的网络互连。要重写自动选择,请为 MultiWorkerMirroredStrategy 的构造函数的 communication 参数指定有效值,例如 communication=tf.distribute.experimental.CollectiveCommunication.NCCL

访问指南中的性能部分,了解有关其他策略和工具的更多信息,您可以使用它们来优化 TensorFlow 模型的性能。

更多的代码示例#

  1. tensorflow/ecosystem 中的端到端示例,使用 Kubernetes 模板进行多工作进程训练。该示例以 Keras 模型开始,并使用 tf.keras.estimator.model_to_estimator API 将其转换为 Estimator。

  2. 官方模型,其中许多模型可以配置为运行多个分布式策略。