{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "Tce3stUlHN0L" }, "outputs": [], "source": [ "##### Copyright 2019 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "tuOe1ymfHZPu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "MfBg1C5NB3X0" }, "source": [ "# Keras 的分布式训练" ] }, { "cell_type": "markdown", "metadata": { "id": "r6P32iYYV27b" }, "source": [ "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 上运行 在 GitHub 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "xHxb-dlhMIzW" }, "source": [ "## 概述\n", "\n", "`tf.distribute.Strategy` API 提供了一个抽象,用于跨多个处理单元进行分布式训练。它允许您使用现有模型和训练代码,只需要很少的修改,就可以执行分布式训练。\n", "\n", "本教程演示了如何使用 `tf.distribute.MirroredStrategy` 在*单台机器的多个 GPU 上通过同步训练*进行计算图内复制。该策略本质上是将所有模型变量复制到每个处理器。 然后,通过使用[全归约](http://mpitutorial.com/tutorials/mpi-reduce-and-allreduce/)来组合所有处理器的梯度,并将组合后的值应用于模型的所有副本。\n", "\n", "您将使用 `tf.keras` API 构建模型并使用 `Model.fit` 对其进行训练。(要了解使用自定义训练循环和 `MirroredStrategy` 的分布式训练,请查看[此教程](custom_training.ipynb)。)\n", "\n", "`MirroredStrategy` 在单台机器上的多个 GPU 上训练您的模型。要*在多个工作进程的多个 GPU 上进行同步训练*,请通过 [Keras Model.fit](multi_worker_with_keras.ipynb) 或[自定义训练循环](multi_worker_with_ctl.ipynb)使用 `tf.distribute.MultiWorkerMirroredStrategy`。有关其他选项,请参阅[分布式训练指南](../../guide/distributed_training.ipynb)。\n", "\n", "要了解其他各种策略,请参阅[使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)指南。" ] }, { "cell_type": "markdown", "metadata": { "id": "Dney9v7BsJij" }, "source": [ "## 安装" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "r8S3ublR7Ay8", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import tensorflow_datasets as tfds\n", "import tensorflow as tf\n", "\n", "import os\n", "\n", "# Load the TensorBoard notebook extension.\n", "%load_ext tensorboard" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "SkocY8tgRd3H", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "print(tf.__version__)" ] }, { "cell_type": "markdown", "metadata": { "id": "hXhefksNKk2I" }, "source": [ "## 下载数据集" ] }, { "cell_type": "markdown", "metadata": { "id": "OtnnUwvmB3X5" }, "source": [ "从 [TensorFlow Datasets](https://tensorflow.google.cn/datasets) 加载 MNIST 数据集。这将返回 `tf.data` 格式的数据集。\n", "\n", "将 `with_info` 参数设置为 `True` 会包含整个数据集的元数据,这些元数据将被保存到 `info` 中。此外,该元数据对象还包括训练样本和测试样本的数量。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iXMJ3G9NB3X6", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)\n", "\n", "mnist_train, mnist_test = datasets['train'], datasets['test']" ] }, { "cell_type": "markdown", "metadata": { "id": "GrjVhv-eKuHD" }, "source": [ "## 定义分布式策略" ] }, { "cell_type": "markdown", "metadata": { "id": "TlH8vx6BB3X9" }, "source": [ "创建 `MirroredStrategy` 对象。这将处理分布,并提供一个上下文管理器 (`MirroredStrategy.scope`) 在内部构建模型。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4j0tdf4YB3X9", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "strategy = tf.distribute.MirroredStrategy()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cY3KA_h2iVfN", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "print('Number of devices: {}'.format(strategy.num_replicas_in_sync))" ] }, { "cell_type": "markdown", "metadata": { "id": "lNbPv0yAleW8" }, "source": [ "## 设置输入流水线" ] }, { "cell_type": "markdown", "metadata": { "id": "psozqcuptXhK" }, "source": [ "当使用多个 GPU 训练模型时,可以通过增加批次大小来有效利用额外的计算能力。通常,应使用适合 GPU 内存的最大批次大小,并相应地调整学习率。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "p1xWxKcnhar9", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# You can also do info.splits.total_num_examples to get the total\n", "# number of examples in the dataset.\n", "\n", "num_train_examples = info.splits['train'].num_examples\n", "num_test_examples = info.splits['test'].num_examples\n", "\n", "BUFFER_SIZE = 10000\n", "\n", "BATCH_SIZE_PER_REPLICA = 64\n", "BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync" ] }, { "cell_type": "markdown", "metadata": { "id": "0Wm5rsL2KoDF" }, "source": [ "定义一个函数,将图像像素值从 `[0, 255]` 范围归一化到 `[0, 1]` 范围([特征缩放](https://en.wikipedia.org/wiki/Feature_scaling)):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Eo9a46ZeJCkm", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def scale(image, label):\n", " image = tf.cast(image, tf.float32)\n", " image /= 255\n", "\n", " return image, label" ] }, { "cell_type": "markdown", "metadata": { "id": "WZCa5RLc5A91" }, "source": [ "将此 `scale` 函数应用于训练数据和测试数据,使用 `tf.data.Dataset` API 对训练数据进行乱序 (`Dataset.shuffle`),然后进行分批 (`Dataset.batch`)。请注意,您还保留了训练数据的内存缓存以提高性能 (`Dataset.cache`).。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gRZu2maChwdT", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)\n", "eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)" ] }, { "cell_type": "markdown", "metadata": { "id": "4xsComp8Kz5H" }, "source": [ "## 创建模型并实例化优化器" ] }, { "cell_type": "markdown", "metadata": { "id": "1BnQYQTpB3YA" }, "source": [ "在 `Strategy.scope` 的上下文中,使用 Keras API 创建和编译模型:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "IexhL_vIB3YA", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "with strategy.scope():\n", " model = tf.keras.Sequential([\n", " tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),\n", " tf.keras.layers.MaxPooling2D(),\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(64, activation='relu'),\n", " tf.keras.layers.Dense(10)\n", " ])\n", "\n", " model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", " optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),\n", " metrics=['accuracy'])" ] }, { "cell_type": "markdown", "metadata": { "id": "DCDKFcNJzdcd" }, "source": [ "对于此包含 MNIST 数据集的微型示例,您将使用 Adam 优化器的默认学习率 0.001。\n", "\n", "对于更大的数据集,分布式训练的主要优点是在每个训练步骤中学习更多信息,因为每个步骤可并行处理更多训练数据,从而允许更大的学习率(处于模型和数据集的限制内)。" ] }, { "cell_type": "markdown", "metadata": { "id": "8i6OU5W9Vy2u" }, "source": [ "## 定义回调\n" ] }, { "cell_type": "markdown", "metadata": { "id": "YOXO5nvvK3US" }, "source": [ "定义以下 [Keras 回调](https://tensorflow.google.cn/guide/keras/train_and_evaluate):\n", "\n", "- `tf.keras.callbacks.TensorBoard`:为 TensorBoard 编写日志,以便呈现计算图。\n", "- `tf.keras.callbacks.ModelCheckpoint`:以特定频率保存模型,例如在每个周期之后。\n", "- `tf.keras.callbacks.BackupAndRestore`:通过备份模型和当前周期号来提供容错功能。在[使用 Keras 进行多工作进程训练](multi_worker_with_keras.ipynb)教程的*容错*部分了解详情。\n", "- `tf.keras.callbacks.LearningRateScheduler`: schedules the learning rate to change after, for example, every epoch/batch.\n", "\n", "出于说明目的,添加名为 PrintLR 的回调以在笔记本中显示学习率。\n", "\n", "**注:** 使用 `BackupAndRestore` 回调而不是 `ModelCheckpoint` 作为从作业失败重新启动时还原训练状态的主要机制。由于 `BackupAndRestore` 仅支持 Eager 模式,在计算图模式下考虑使用 `ModelCheckpoint`。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "A9bwLCcXzSgy", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Define the checkpoint directory to store the checkpoints.\n", "checkpoint_dir = './training_checkpoints'\n", "# Define the name of the checkpoint files.\n", "checkpoint_prefix = os.path.join(checkpoint_dir, \"ckpt_{epoch}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wpU-BEdzJDbK", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Define a function for decaying the learning rate.\n", "# You can define any decay function you need.\n", "def decay(epoch):\n", " if epoch < 3:\n", " return 1e-3\n", " elif epoch >= 3 and epoch < 7:\n", " return 1e-4\n", " else:\n", " return 1e-5" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "jKhiMgXtKq2w", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Define a callback for printing the learning rate at the end of each epoch.\n", "class PrintLR(tf.keras.callbacks.Callback):\n", " def on_epoch_end(self, epoch, logs=None):\n", " print('\\nLearning rate for epoch {} is {}'.format( epoch + 1, model.optimizer.lr.numpy()))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YVqAbR6YyNQh", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Put all the callbacks together.\n", "callbacks = [\n", " tf.keras.callbacks.TensorBoard(log_dir='./logs'),\n", " tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,\n", " save_weights_only=True),\n", " tf.keras.callbacks.LearningRateScheduler(decay),\n", " PrintLR()\n", "]" ] }, { "cell_type": "markdown", "metadata": { "id": "70HXgDQmK46q" }, "source": [ "## 训练并评估" ] }, { "cell_type": "markdown", "metadata": { "id": "6EophnOAB3YD" }, "source": [ "现在,以普通方式训练模型,在模型上调用 Keras `Model.fit` 并传入在教程开始时创建的数据集。无论您是否分布训练,此步骤相同。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "7MVw_6CqB3YD", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "EPOCHS = 12\n", "\n", "model.fit(train_dataset, epochs=EPOCHS, callbacks=callbacks)" ] }, { "cell_type": "markdown", "metadata": { "id": "NUcWAUUupIvG" }, "source": [ "查看保存的检查点:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "JQ4zeSTxKEhB", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Check the checkpoint directory.\n", "!ls {checkpoint_dir}" ] }, { "cell_type": "markdown", "metadata": { "id": "qor53h7FpMke" }, "source": [ "要查看模型的执行情况,请加载最新的检查点,并在测试数据上调用 `Model.evaluate`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "JtEwxiTgpQoP", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))\n", "\n", "eval_loss, eval_acc = model.evaluate(eval_dataset)\n", "\n", "print('Eval loss: {}, Eval accuracy: {}'.format(eval_loss, eval_acc))" ] }, { "cell_type": "markdown", "metadata": { "id": "IIeF2RWfYu4N" }, "source": [ "要可视化输出,请启动 TensorBoard 并查看日志:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vtyAZO0DoKu_", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%tensorboard --logdir=logs" ] }, { "cell_type": "markdown", "metadata": { "id": "a0a82d26d6bd" }, "source": [ "" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "LnyscOkvKKBR", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "!ls -sh ./logs" ] }, { "cell_type": "markdown", "metadata": { "id": "kBLlogrDvMgg" }, "source": [ "## 保存模型" ] }, { "cell_type": "markdown", "metadata": { "id": "Xa87y_A0vRma" }, "source": [ "使用 `Model.save` 将模型保存到一个 `.keras` 压缩归档中。保存后,您可以使用或不使用 `Strategy.scope` 加载模型。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "h8Q4MKOLwG7K", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "path = 'my_model.keras'" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4HvcDmVsvQoa", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "model.save(path)" ] }, { "cell_type": "markdown", "metadata": { "id": "vKJT4w5JwVPI" }, "source": [ "现在,在没有 `Strategy.scope` 的情况下加载模型:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "T_gT0RbRvQ3o", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "unreplicated_model = tf.keras.models.load_model(path)\n", "\n", "unreplicated_model.compile(\n", " loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", " optimizer=tf.keras.optimizers.Adam(),\n", " metrics=['accuracy'])\n", "\n", "eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)\n", "\n", "print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))" ] }, { "cell_type": "markdown", "metadata": { "id": "YBLzcRF0wbDe" }, "source": [ "使用 `Strategy.scope` 加载模型:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "BBVo3WGGwd9a", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "with strategy.scope():\n", " replicated_model = tf.keras.models.load_model(path)\n", " replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", " optimizer=tf.keras.optimizers.Adam(),\n", " metrics=['accuracy'])\n", "\n", " eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)\n", " print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))" ] }, { "cell_type": "markdown", "metadata": { "id": "MUZwaz4AKjtD" }, "source": [ "### 其他资源\n", "\n", "更多通过 Keras `Model.fit` API 使用不同分布策略的示例:\n", "\n", "1. [在 TPU 上使用 BERT 解决 GLUE 任务](https://tensorflow.google.cn/text/tutorials/bert_glue)教程使用 `tf.distribute.MirroredStrategy` 在 GPU 上进行训练,并使用 `tf.distribute.TPUStrategy` 在 TPU 上进行训练。\n", "2. [使用分布式策略保存和加载模型](save_and_load.ipynb)教程演示了如何将 SavedModel API 与 `tf.distribute.Strategy` 一起使用。\n", "3. [官方 TensorFlow 模型](https://github.com/tensorflow/models/tree/master/official)可以配置为运行多个分布式策略。\n", "\n", "要了解有关 TensorFlow 分布式策略的更多信息,请参阅以下资料:\n", "\n", "1. [使用 tf.distribute.Strategy 进行自定义训练](custom_training.ipynb)教程展示了如何使用 `tf.distribute.MirroredStrategy` 通过自定义训练循环进行单工作进程训练。\n", "2. [使用 Keras 进行多工作进程训练](multi_worker_with_keras.ipynb)教程展示了如何将 `MultiWorkerMirroredStrategy` 与 `Model.fit` 一起使用。\n", "3. [使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环](multi_worker_with_ctl.ipynb)教程展示了如何将 `MultiWorkerMirroredStrategy` 与 Keras 和自定义训练循环一起使用。\n", "4. [TensorFlow 中的分布式训练](https://tensorflow.google.cn/guide/distributed_training)指南概述了可用的分布式策略。\n", "5. [使用 tf.function 获得更佳性能](../../guide/function.ipynb)指南提供了有关其他策略和工具的信息,例如可用于优化 TensorFlow 模型性能的 [TensorFlow Profiler](../../guide/profiler.md)。\n", "\n", "注:`tf.distribute.Strategy` 正在积极开发中,TensorFlow 将在不久的将来添加更多示例和教程。请进行尝试。我们欢迎您通过 [GitHub 上的议题](https://github.com/tensorflow/tensorflow/issues/new)提交反馈。" ] } ], "metadata": { "accelerator": "GPU", "colab": { "collapsed_sections": [], "name": "keras.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }