##### Copyright 2021 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

迁移多工作进程 CPU/GPU 训练#

在 TensorFlow.org 上查看 在 Google Colab 运行 在 Github 上查看源代码 下载笔记本

本指南演示了如何将多工作进程分布式训练工作流从 TensorFlow 1 迁移到 TensorFlow 2。

使用 CPU/GPU 执行多工作进程训练:

安装#

从一些必要的导入和用于演示目的的简单数据集开始:

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

您将需要 'TF_CONFIG' 配置环境变量以在 TensorFlow 中的多台机器上进行训练。使用 'TF_CONFIG' 指定 'cluster''task' 的地址。(有关详情,请参阅分布式训练指南。)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

注:遗憾的是,由于在 TensorFlow 1 中使用 tf.estimator API 进行多工作进程训练需要多个客户端(在此 Colab 笔记本中实现这一点会特别棘手),这会使笔记本在没有 'TF_CONFIG' 环境变量的情况下可运行,因此它会回退为本地训练。(有关详情,请参阅使用 TensorFlow 进行分布式训练指南中的设置 'TF_CONFIG' 环境变量部分。)

使用 del 语句移除变量(但在 TensorFlow 1 中的实际多工作进程训练中,您不必这样做):

del os.environ['TF_CONFIG']

TensorFlow 1:使用 tf.estimator API 进行多工作进程分布式训练#

以下代码段演示了 TF1 中多工作进程训练的规范工作流:您将使用 tf.estimator.Estimatortf.estimator.TrainSpectf.estimator.EvalSpectf.estimator.train_and_evaluate API 来分布训练:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

TensorFlow 2:使用分布策略进行多工作进程训练#

在 TensorFlow 2 中,使用 CPU、GPU 和 TPU 的多个工作进程之间的分布式训练是通过 tf.distribute.Strategy 完成的。

下面的示例演示了如何使用两种这样的策略:tf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy,这两种策略都是为使用多个工作进程进行 CPU/GPU 训练而设计的。

ParameterServerStrategy 使用了一个协调器 ('chief'),这使其对此 Colab 笔记本中的环境更加友好。您将在此处使用一些效用函数来设置可运行体验所必需的支持元素:您将创建一个进程内聚簇,其中线程用于模拟参数服务器 ('ps') 和工作进程 ('worker')。有关参数服务器训练的更多信息,请参阅使用 ParameterServerStrategy 进行参数服务器训练教程。

在此示例中,首先使用 tf.distribute.cluster_resolver.TFConfigClusterResolver 定义 'TF_CONFIG' 环境变量以提供聚簇信息。如果您使用聚簇管理系统进行分布式训练,请检查它是否已经提供了 'TF_CONFIG',如果已提供,则无需显式设置此环境变量。(有关详情,请参阅使用 TensorFlow 进行分布式训练指南中的设置 'TF_CONFIG' 环境变量部分。)

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

然后,为工程进程和参数服务器一一创建 tf.distribute.Server

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

在现实世界的分布式训练中,您将使用多台机器而不是启动协调器上的所有 tf.distribute.Server,并且指定为"worker""ps"(参数服务器)的机器将各自运行一个 tf.distribute.Server。请参阅参数服务器训练教程中的现实世界中的聚簇部分,了解更多详细信息。

一切准备就绪后,创建 ParameterServerStrategy 对象:

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

创建策略对象后,定义模型、优化器和其他变量,然后在 Strategy.scope API 中调用 Keras Model.compile 以分布训练。(如需了解详情,请参阅 Strategy.scope API 文档。)

如果您更喜欢通过定义前向和后向传递来自定义训练,请参阅参数服务器训练教程中的使用自定义训练循环进行训练部分,了解更多详细信息。

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
model.evaluate(eval_dataset, steps=10, return_dict=True)

分区器 (tf.distribute.experimental.partitioners)

TensorFlow 2 中的 ParameterServerStrategy 支持变量分区,并提供与 TensorFlow 1 相同,但名称不容易混淆的分区器:

  • tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner:将分片保持在最大大小以下的分区器)。

  • tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner:为每个分片分配最小大小的分区器。

  • tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner:分配固定数量分片的分区器。

或者,您也可以使用 MultiWorkerMirroredStrategy 对象:

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()

可以将上面使用的策略替换为 MultiWorkerMirroredStrategy 对象,以使用此策略执行训练。

tf.estimator API 一样,由于 MultiWorkerMirroredStrategy 是一种多客户端策略,在此 Colab 笔记本中运行分布式训练并不容易。因此,用这种策略替换上面的代码最终会以在本地运行结束。使用 Keras Model.fit/自定义训练循环的多工作进程训练教程演示了在设置 'TF_CONFIG' 变量的情况下,如何在 Colab 的本地主机上使用两个工作进程运行多工作进程训练。在实践中,您将在外部 IP 地址/端口上创建多个工作进程,并使用 'TF_CONFIG' 变量为每个工作进程指定聚簇配置。

后续步骤#

要详细了解如何在 TensorFlow 2 中使用 tf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy 进行多工作进程分布式训练,请查看以下资源: