##### Copyright 2021 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
迁移多工作进程 CPU/GPU 训练#
在 TensorFlow.org 上查看 | 在 Google Colab 运行 | 在 Github 上查看源代码 | 下载笔记本 |
本指南演示了如何将多工作进程分布式训练工作流从 TensorFlow 1 迁移到 TensorFlow 2。
使用 CPU/GPU 执行多工作进程训练:
在 TensorFlow 1 中,您通常会使用
tf.estimator.train_and_evaluate
和tf.estimator.Estimator
API。在 TensorFlow 2 中,使用 Keras API 编写模型、损失函数、优化器和指标。随后,利用 Keras
Model.fit
API 或自定义训练循环(使用tf.GradientTape
)并通过tf.distribute.experimental.ParameterServerStrategy
或tf.distribute.MultiWorkerMirroredStrategy
将训练分布到多个工作进程之间。有关详情,请参阅下列教程:
安装#
从一些必要的导入和用于演示目的的简单数据集开始:
# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker
import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]
您将需要 'TF_CONFIG'
配置环境变量以在 TensorFlow 中的多台机器上进行训练。使用 'TF_CONFIG'
指定 'cluster'
和 'task'
的地址。(有关详情,请参阅分布式训练指南。)
import json
import os
tf_config = {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
注:遗憾的是,由于在 TensorFlow 1 中使用 tf.estimator
API 进行多工作进程训练需要多个客户端(在此 Colab 笔记本中实现这一点会特别棘手),这会使笔记本在没有 'TF_CONFIG'
环境变量的情况下可运行,因此它会回退为本地训练。(有关详情,请参阅使用 TensorFlow 进行分布式训练指南中的设置 'TF_CONFIG'
环境变量部分。)
使用 del
语句移除变量(但在 TensorFlow 1 中的实际多工作进程训练中,您不必这样做):
del os.environ['TF_CONFIG']
TensorFlow 1:使用 tf.estimator API 进行多工作进程分布式训练#
以下代码段演示了 TF1 中多工作进程训练的规范工作流:您将使用 tf.estimator.Estimator
、tf.estimator.TrainSpec
、tf.estimator.EvalSpec
和 tf.estimator.train_and_evaluate
API 来分布训练:
def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)
def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)
def _model_fn(features, labels, mode):
logits = tf1.layers.Dense(1)(features)
loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
optimizer = tf1.train.AdagradOptimizer(0.05)
train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
TensorFlow 2:使用分布策略进行多工作进程训练#
在 TensorFlow 2 中,使用 CPU、GPU 和 TPU 的多个工作进程之间的分布式训练是通过 tf.distribute.Strategy
完成的。
下面的示例演示了如何使用两种这样的策略:tf.distribute.experimental.ParameterServerStrategy
和 tf.distribute.MultiWorkerMirroredStrategy
,这两种策略都是为使用多个工作进程进行 CPU/GPU 训练而设计的。
ParameterServerStrategy
使用了一个协调器 ('chief'
),这使其对此 Colab 笔记本中的环境更加友好。您将在此处使用一些效用函数来设置可运行体验所必需的支持元素:您将创建一个进程内聚簇,其中线程用于模拟参数服务器 ('ps'
) 和工作进程 ('worker'
)。有关参数服务器训练的更多信息,请参阅使用 ParameterServerStrategy 进行参数服务器训练教程。
在此示例中,首先使用 tf.distribute.cluster_resolver.TFConfigClusterResolver
定义 'TF_CONFIG'
环境变量以提供聚簇信息。如果您使用聚簇管理系统进行分布式训练,请检查它是否已经提供了 'TF_CONFIG'
,如果已提供,则无需显式设置此环境变量。(有关详情,请参阅使用 TensorFlow 进行分布式训练指南中的设置 'TF_CONFIG'
环境变量部分。)
# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker
chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]
# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps': ["localhost:%s" % port for port in ps_ports],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
然后,为工程进程和参数服务器一一创建 tf.distribute.Server
:
# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4
for i in range(3):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="worker",
task_index=i,
config=worker_config)
for i in range(2):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="ps",
task_index=i)
在现实世界的分布式训练中,您将使用多台机器而不是启动协调器上的所有 tf.distribute.Server
,并且指定为"worker"
和 "ps"
(参数服务器)的机器将各自运行一个 tf.distribute.Server
。请参阅参数服务器训练教程中的现实世界中的聚簇部分,了解更多详细信息。
一切准备就绪后,创建 ParameterServerStrategy
对象:
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
创建策略对象后,定义模型、优化器和其他变量,然后在 Strategy.scope
API 中调用 Keras Model.compile
以分布训练。(如需了解详情,请参阅 Strategy.scope
API 文档。)
如果您更喜欢通过定义前向和后向传递来自定义训练,请参阅参数服务器训练教程中的使用自定义训练循环进行训练部分,了解更多详细信息。
dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)
eval_dataset = tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)
model.compile(optimizer, "mse")
model.fit(dataset, epochs=5, steps_per_epoch=10)
model.evaluate(eval_dataset, steps=10, return_dict=True)
分区器 (
tf.distribute.experimental.partitioners
)TensorFlow 2 中的
ParameterServerStrategy
支持变量分区,并提供与 TensorFlow 1 相同,但名称不容易混淆的分区器:
tf.compat.v1.variable_axis_size_partitioner
->tf.distribute.experimental.partitioners.MaxSizePartitioner
:将分片保持在最大大小以下的分区器)。
tf.compat.v1.min_max_variable_partitioner
->tf.distribute.experimental.partitioners.MinSizePartitioner
:为每个分片分配最小大小的分区器。
tf.compat.v1.fixed_size_partitioner
->tf.distribute.experimental.partitioners.FixedShardsPartitioner
:分配固定数量分片的分区器。
或者,您也可以使用 MultiWorkerMirroredStrategy
对象:
# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
可以将上面使用的策略替换为 MultiWorkerMirroredStrategy
对象,以使用此策略执行训练。
与 tf.estimator
API 一样,由于 MultiWorkerMirroredStrategy
是一种多客户端策略,在此 Colab 笔记本中运行分布式训练并不容易。因此,用这种策略替换上面的代码最终会以在本地运行结束。使用 Keras Model.fit/自定义训练循环的多工作进程训练教程演示了在设置 'TF_CONFIG'
变量的情况下,如何在 Colab 的本地主机上使用两个工作进程运行多工作进程训练。在实践中,您将在外部 IP 地址/端口上创建多个工作进程,并使用 'TF_CONFIG'
变量为每个工作进程指定聚簇配置。
后续步骤#
要详细了解如何在 TensorFlow 2 中使用 tf.distribute.experimental.ParameterServerStrategy
和 tf.distribute.MultiWorkerMirroredStrategy
进行多工作进程分布式训练,请查看以下资源: