##### Copyright 2019 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Keras 的分布式训练#
在 TensorFlow.org 上查看 | 在 Google Colab 上运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述#
tf.distribute.Strategy
API 提供了一个抽象,用于跨多个处理单元进行分布式训练。它允许您使用现有模型和训练代码,只需要很少的修改,就可以执行分布式训练。
本教程演示了如何使用 tf.distribute.MirroredStrategy
在单台机器的多个 GPU 上通过同步训练进行计算图内复制。该策略本质上是将所有模型变量复制到每个处理器。 然后,通过使用全归约来组合所有处理器的梯度,并将组合后的值应用于模型的所有副本。
您将使用 tf.keras
API 构建模型并使用 Model.fit
对其进行训练。(要了解使用自定义训练循环和 MirroredStrategy
的分布式训练,请查看此教程。)
MirroredStrategy
在单台机器上的多个 GPU 上训练您的模型。要在多个工作进程的多个 GPU 上进行同步训练,请通过 Keras Model.fit 或自定义训练循环使用 tf.distribute.MultiWorkerMirroredStrategy
。有关其他选项,请参阅分布式训练指南。
要了解其他各种策略,请参阅使用 TensorFlow 进行分布式训练指南。
安装#
import tensorflow_datasets as tfds
import tensorflow as tf
import os
# Load the TensorBoard notebook extension.
%load_ext tensorboard
print(tf.__version__)
下载数据集#
从 TensorFlow Datasets 加载 MNIST 数据集。这将返回 tf.data
格式的数据集。
将 with_info
参数设置为 True
会包含整个数据集的元数据,这些元数据将被保存到 info
中。此外,该元数据对象还包括训练样本和测试样本的数量。
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']
定义分布式策略#
创建 MirroredStrategy
对象。这将处理分布,并提供一个上下文管理器 (MirroredStrategy.scope
) 在内部构建模型。
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
设置输入流水线#
当使用多个 GPU 训练模型时,可以通过增加批次大小来有效利用额外的计算能力。通常,应使用适合 GPU 内存的最大批次大小,并相应地调整学习率。
# You can also do info.splits.total_num_examples to get the total
# number of examples in the dataset.
num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples
BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
定义一个函数,将图像像素值从 [0, 255]
范围归一化到 [0, 1]
范围(特征缩放):
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
将此 scale
函数应用于训练数据和测试数据,使用 tf.data.Dataset
API 对训练数据进行乱序 (Dataset.shuffle
),然后进行分批 (Dataset.batch
)。请注意,您还保留了训练数据的内存缓存以提高性能 (Dataset.cache
).。
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)
创建模型并实例化优化器#
在 Strategy.scope
的上下文中,使用 Keras API 创建和编译模型:
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
metrics=['accuracy'])
对于此包含 MNIST 数据集的微型示例,您将使用 Adam 优化器的默认学习率 0.001。
对于更大的数据集,分布式训练的主要优点是在每个训练步骤中学习更多信息,因为每个步骤可并行处理更多训练数据,从而允许更大的学习率(处于模型和数据集的限制内)。
定义回调#
定义以下 Keras 回调:
tf.keras.callbacks.TensorBoard
:为 TensorBoard 编写日志,以便呈现计算图。tf.keras.callbacks.ModelCheckpoint
:以特定频率保存模型,例如在每个周期之后。tf.keras.callbacks.BackupAndRestore
:通过备份模型和当前周期号来提供容错功能。在使用 Keras 进行多工作进程训练教程的容错部分了解详情。tf.keras.callbacks.LearningRateScheduler
: schedules the learning rate to change after, for example, every epoch/batch.
出于说明目的,添加名为 PrintLR
的回调以在笔记本中显示学习率。
注: 使用 BackupAndRestore
回调而不是 ModelCheckpoint
作为从作业失败重新启动时还原训练状态的主要机制。由于 BackupAndRestore
仅支持 Eager 模式,在计算图模式下考虑使用 ModelCheckpoint
。
# Define the checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
# Define the name of the checkpoint files.
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
# Define a function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
if epoch < 3:
return 1e-3
elif epoch >= 3 and epoch < 7:
return 1e-4
else:
return 1e-5
# Define a callback for printing the learning rate at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
print('\nLearning rate for epoch {} is {}'.format( epoch + 1, model.optimizer.lr.numpy()))
# Put all the callbacks together.
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir='./logs'),
tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
save_weights_only=True),
tf.keras.callbacks.LearningRateScheduler(decay),
PrintLR()
]
训练并评估#
现在,以普通方式训练模型,在模型上调用 Keras Model.fit
并传入在教程开始时创建的数据集。无论您是否分布训练,此步骤相同。
EPOCHS = 12
model.fit(train_dataset, epochs=EPOCHS, callbacks=callbacks)
查看保存的检查点:
# Check the checkpoint directory.
!ls {checkpoint_dir}
要查看模型的执行情况,请加载最新的检查点,并在测试数据上调用 Model.evaluate
:
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
eval_loss, eval_acc = model.evaluate(eval_dataset)
print('Eval loss: {}, Eval accuracy: {}'.format(eval_loss, eval_acc))
要可视化输出,请启动 TensorBoard 并查看日志:
%tensorboard --logdir=logs
!ls -sh ./logs
保存模型#
使用 Model.save
将模型保存到一个 .keras
压缩归档中。保存后,您可以使用或不使用 Strategy.scope
加载模型。
path = 'my_model.keras'
model.save(path)
现在,在没有 Strategy.scope
的情况下加载模型:
unreplicated_model = tf.keras.models.load_model(path)
unreplicated_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
使用 Strategy.scope
加载模型:
with strategy.scope():
replicated_model = tf.keras.models.load_model(path)
replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
其他资源#
更多通过 Keras Model.fit
API 使用不同分布策略的示例:
在 TPU 上使用 BERT 解决 GLUE 任务教程使用
tf.distribute.MirroredStrategy
在 GPU 上进行训练,并使用tf.distribute.TPUStrategy
在 TPU 上进行训练。使用分布式策略保存和加载模型教程演示了如何将 SavedModel API 与
tf.distribute.Strategy
一起使用。官方 TensorFlow 模型可以配置为运行多个分布式策略。
要了解有关 TensorFlow 分布式策略的更多信息,请参阅以下资料:
使用 tf.distribute.Strategy 进行自定义训练教程展示了如何使用
tf.distribute.MirroredStrategy
通过自定义训练循环进行单工作进程训练。使用 Keras 进行多工作进程训练教程展示了如何将
MultiWorkerMirroredStrategy
与Model.fit
一起使用。使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环教程展示了如何将
MultiWorkerMirroredStrategy
与 Keras 和自定义训练循环一起使用。TensorFlow 中的分布式训练指南概述了可用的分布式策略。
使用 tf.function 获得更佳性能指南提供了有关其他策略和工具的信息,例如可用于优化 TensorFlow 模型性能的 TensorFlow Profiler。
注:tf.distribute.Strategy
正在积极开发中,TensorFlow 将在不久的将来添加更多示例和教程。请进行尝试。我们欢迎您通过 GitHub 上的议题提交反馈。