{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "Tce3stUlHN0L" }, "outputs": [], "source": [ "##### Copyright 2019 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "tuOe1ymfHZPu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "MfBg1C5NB3X0" }, "source": [ "# 利用 Keras 来训练多工作器(worker)\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 中运行 在 Github 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "xHxb-dlhMIzW" }, "source": [ "## 概述\n", "\n", "本教程演示了如何使用 `tf.distribute.MultiWorkerMirroredStrategy` API 通过 Keras 模型和 `Model.fit` API 执行多工作进程分布式训练。借助此策略,设计用于在单个工作进程上运行的 Keras 模型只需最少量的代码变更即可无缝地在多个工作进程上运行。\n", "\n", "To learn how to use the `MultiWorkerMirroredStrategy` with Keras and a custom training loop, refer to [Custom training loop with Keras and MultiWorkerMirroredStrategy](multi_worker_with_ctl.ipynb).\n", "\n", "本教程包含一个最小多工作进程示例,出于演示目的,其中有两个工作进程。" ] }, { "cell_type": "markdown", "metadata": { "id": "JUdRerXg6yz3" }, "source": [ "### 选择正确的策略" ] }, { "cell_type": "markdown", "metadata": { "id": "YAiCV_oL63GM" }, "source": [ "在开始之前,请确保 `tf.distribute.MultiWorkerMirroredStrategy` 是您的加速器和训练的正确选择。以下是使用数据并行分布训练的两种常见方式:\n", "\n", "- *同步训练*,训练步骤在工作进程和副本之间同步,例如 `tf.distribute.MirroredStrategy`、`tf.distribute.TPUStrategy` 和 `tf.distribute.MultiWorkerMirroredStrategy`。所有工作进程同步训练不同的输入数据切片,并在每一步聚合梯度。\n", "- *异步训练*,训练步骤不会严格同步,例如 `tf.distribute.experimental.ParameterServerStrategy`。所有工作进程都对输入数据进行独立训练并异步更新变量。\n", "\n", "如果您正在寻找没有 TPU 的多工作进程同步训练,那么应该选择 `tf.distribute.MultiWorkerMirroredStrategy`。它会在所有工作进程的每个设备上在模型的层中创建所有变量的副本。它使用 `CollectiveOps`(一种用于集合通信的 TensorFlow 运算)来聚合梯度并保持变量同步。如果您有兴趣,请查看 `tf.distribute.experimental.CommunicationOptions` 参数以了解集合实施选项。\n", "\n", "有关 `tf.distribute.Strategy` API 的概述,请参阅 [TensorFlow 中的分布式训练](../../guide/distributed_training.ipynb)。" ] }, { "cell_type": "markdown", "metadata": { "id": "MUXex9ctTuDB" }, "source": [ "## 设置\n", "\n", "先进行一些必要的导入:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "bnYxvfLD-LW-", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import json\n", "import os\n", "import sys" ] }, { "cell_type": "markdown", "metadata": { "id": "Zz0EY91y3mxy" }, "source": [ "在导入 TensorFlow 之前,需要对环境进行一些变更:\n", "\n", "- 在现实世界的应用中,每个工作进程将在不同的机器上运行。出于本教程的目的,所有工作进程都将在**这台**机器上运行。因此,请停用所有 GPU 以防止因所有工作进程尝试使用同一 GPU 而导致的错误。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rpEIVI5upIzM", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ[\"CUDA_VISIBLE_DEVICES\"] = \"-1\"" ] }, { "cell_type": "markdown", "metadata": { "id": "7X1MS6385BWi" }, "source": [ "- 重置 `TF_CONFIG` 环境变量(稍后您将了解更多相关信息):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "WEJLYa2_7OZF", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ.pop('TF_CONFIG', None)" ] }, { "cell_type": "markdown", "metadata": { "id": "Rd4L9Ii77SS8" }, "source": [ "- 确保当前目录位于 Python 的路径上。这样,笔记本可以导入稍后由 `%%writefile` 写入的文件:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hPBuZUNSZmrQ", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "if '.' not in sys.path:\n", " sys.path.insert(0, '.')" ] }, { "cell_type": "markdown", "metadata": { "id": "9hLpDZhAz2q-" }, "source": [ "安装 `tf-nightly`,因为使用 `tf.keras.callbacks.BackupAndRestore` 中的 `save_freq` 参数设置特定步骤保存检查点的频率是从 TensorFlow 2.10 引入的:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-XqozLfzz30N", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "!pip install tf-nightly" ] }, { "cell_type": "markdown", "metadata": { "id": "524e38dab658" }, "source": [ "最后,导入 TensorFlow:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vHNvttzV43sA", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import tensorflow as tf" ] }, { "cell_type": "markdown", "metadata": { "id": "0S2jpf6Sx50i" }, "source": [ "### 数据集和模型定义" ] }, { "cell_type": "markdown", "metadata": { "id": "fLW6D2TzvC-4" }, "source": [ "接下来,使用简单的模型和数据集设置创建 `mnist_setup.py` 文件。本教程中的工作进程将使用此 Python 文件:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dma_wUAxZqo2", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%writefile mnist_setup.py\n", "\n", "import os\n", "import tensorflow as tf\n", "import numpy as np\n", "\n", "def mnist_dataset(batch_size):\n", " (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()\n", " # The `x` arrays are in uint8 and have values in the [0, 255] range.\n", " # You need to convert them to float32 with values in the [0, 1] range.\n", " x_train = x_train / np.float32(255)\n", " y_train = y_train.astype(np.int64)\n", " train_dataset = tf.data.Dataset.from_tensor_slices(\n", " (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)\n", " return train_dataset\n", "\n", "def build_and_compile_cnn_model():\n", " model = tf.keras.Sequential([\n", " tf.keras.layers.InputLayer(input_shape=(28, 28)),\n", " tf.keras.layers.Reshape(target_shape=(28, 28, 1)),\n", " tf.keras.layers.Conv2D(32, 3, activation='relu'),\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(128, activation='relu'),\n", " tf.keras.layers.Dense(10)\n", " ])\n", " model.compile(\n", " loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", " optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),\n", " metrics=['accuracy'])\n", " return model" ] }, { "cell_type": "markdown", "metadata": { "id": "2UL3kisMO90X" }, "source": [ "### 在单个工作进程上进行模型训练\n", "\n", "让我们首先尝试用少量的 epoch 来训练模型,并在单个工作器(worker)中观察结果,以确保一切正常。 随着训练的迭代,您应该会看到损失(loss)下降和准确度(accuracy)接近1.0。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6Qe6iAf5O8iJ", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import mnist_setup\n", "\n", "batch_size = 64\n", "single_worker_dataset = mnist_setup.mnist_dataset(batch_size)\n", "single_worker_model = mnist_setup.build_and_compile_cnn_model()\n", "single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)" ] }, { "cell_type": "markdown", "metadata": { "id": "JmgZwwymxqt5" }, "source": [ "## 多工作进程配置\n", "\n", "现在,让我们进入多工作进程训练的世界。\n", "\n", "### 具有作业和任务的集群\n", "\n", "在 TensorFlow 中,分布式训练涉及:一个包含多个作业的 `'cluster'`,每个作业可能有一个或多个 `'task'`。\n", "\n", "您将需要使用 `TF_CONFIG` 配置环境变量在多台计算机上进行训练,每台计算机都可能具有不同的角色。`TF_CONFIG` 是一个 JSON 字符串,用于在每个作为集群一部分的工作进程上指定集群配置。\n", "\n", "`TF_CONFIG` 有两个可用组件:`'cluster'` 和 `'task'`。\n", "\n", "- `'cluster'` 对所有工作进程都相同,并提供有关训练集群的信息,这是一个由不同类型的作业(例如 `'worker'` 或 `'chief'`)组成的字典。\n", "\n", " - 在使用 `tf.distribute.MultiWorkerMirroredStrategy` 进行多工作进程训练时,除了普通的 `'worker'` 之外,通常还有一个 `'worker'` 承担更多责任,例如保存检查点和为 TensorBoard 编写摘要文件。此类 `'worker'` 被称为首席工作进程(作业名称为 `'chief'`)。\n", " - `'chief'` 通常是 `'index'` 为 `0` 的工作进程。\n", "\n", "- `'task'` 提供当前任务的信息,在每个工作进程上各不相同。它指定相应工作进程的 `'type'` 和 `'index'`。\n", "\n", "下面是一个示例配置:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "XK1eTYvSZiX7", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "tf_config = {\n", " 'cluster': {\n", " 'worker': ['localhost:12345', 'localhost:23456']\n", " },\n", " 'task': {'type': 'worker', 'index': 0}\n", "}" ] }, { "cell_type": "markdown", "metadata": { "id": "JjgwJbPKZkJL" }, "source": [ "请注意,`tf_config` 只是 Python 中的局部变量。要将其用于训练配置,请将其序列化为 JSON 并将其放置在 `TF_CONFIG` 环境变量中。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "yY-T0YDQZjbu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "8YFpxrcsZ2xG" }, "source": [ "在上面的示例配置中,您将任务 `'type'` 设置为 `'worker'`,并将任务 `'index'` 设置为 `0`。因此,这台计算机是*第一个*工作进程。它将被任命为 `'chief'` 工作进程。\n", "\n", "注:其他计算机也需要设置 `TF_CONFIG` 环境变量,并且它应该具有相同的 `'cluster'` 字典,但具有不同的任务 `'type'` 或任务 `'index'`,具体取决于这些计算机的角色。" ] }, { "cell_type": "markdown", "metadata": { "id": "aogb74kHxynz" }, "source": [ "在实践中,您将在外部 IP 地址/端口上创建多个工作进程,并相应地在每个工作进程上设置一个 `TF_CONFIG` 变量。出于说明目的,本教程展示了如何在 `localhost` 上设置带有两个工作进程的 `TF_CONFIG` 变量:\n", "\n", "- 上面显示了第一个 (`'chief'`) 工作进程的 `TF_CONFIG`。\n", "- 对于第二个工作进程,您将设置 `tf_config['task']['index']=1`" ] }, { "cell_type": "markdown", "metadata": { "id": "cIlkfWmjz1PG" }, "source": [ "### 笔记本中的环境变量和子进程" ] }, { "cell_type": "markdown", "metadata": { "id": "FcjAbuGY1ACJ" }, "source": [ "子进程会从其父进程继承环境变量。因此,如果您在此 Jupyter Notebook 进程中设置环境变量:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PH2gHn2_0_U8", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ['GREETINGS'] = 'Hello TensorFlow!'" ] }, { "cell_type": "markdown", "metadata": { "id": "gQkIX-cg18md" }, "source": [ "...然后,您可以从子进程访问环境变量:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pquKO6IA18G5", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "echo ${GREETINGS}" ] }, { "cell_type": "markdown", "metadata": { "id": "af6BCA-Y2fpz" }, "source": [ "在下一部分中,您将使用此方法将 `TF_CONFIG` 传递给子工作进程。在现实世界的场景中,您永远不会以这种方式启动你的作业。本教程只是为了展示如何通过一个最小的多工作进程示例来做到这一点。" ] }, { "cell_type": "markdown", "metadata": { "id": "dnDJmaRA9qnf" }, "source": [ "## 训练模型" ] }, { "cell_type": "markdown", "metadata": { "id": "UhNtHfuxCGVy" }, "source": [ "要训练模型,首先创建一个 `tf.distribute.MultiWorkerMirroredStrategy` 的实例:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1uFSHCJXMrQ-", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "strategy = tf.distribute.MultiWorkerMirroredStrategy()" ] }, { "cell_type": "markdown", "metadata": { "id": "N0iv7SyyAohc" }, "source": [ "注:调用 `MultiWorkerMirroredStrategy` 时,将解析 `TF_CONFIG` 并启动 TensorFlow 的 GRPC 服务器,因此必须在创建`tf.distribute.Strategy`实例之前设置 `TF_CONFIG` 环境变量。" ] }, { "cell_type": "markdown", "metadata": { "id": "H47DDcOgfzm7" }, "source": [ "得益于将 `tf.distribute.Strategy` API 集成到 `tf.keras` 中,您在将训练分布到多个工作进程中需要执行的唯一变更就是将模型构建和 `model.compile()` 调用添加到 `strategy.scope()` 内。分布策略的范围决定了变量的创建方式和位置,采用 `MultiWorkerMirroredStrategy` 时,创建的变量将为 `MirroredVariable`,它们会复制到每个工作进程上。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "wo6b9wX65glL", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "with strategy.scope():\n", " # Model building/compiling need to be within `strategy.scope()`.\n", " multi_worker_model = mnist_setup.build_and_compile_cnn_model()" ] }, { "cell_type": "markdown", "metadata": { "id": "Mhq3fzyR5hTw" }, "source": [ "注:目前在 `MultiWorkerMirroredStrategy` 中存在一个限制,即需要在创建策略实例后再创建 TensorFlow 运算。如果您遇到 `RuntimeError: Collective ops must be configured at program startup`,请尝试在程序的开头创建 `MultiWorkerMirroredStrategy` 的实例,并在策略实例化后加入可以创建运算的代码。" ] }, { "cell_type": "markdown", "metadata": { "id": "jfYpmIxO6Jck" }, "source": [ "要实际使用 `MultiWorkerMirroredStrategy` 运行,您需要运行工作进程并向其传递 `TF_CONFIG`。\n", "\n", "与之前编写的 `mnist_setup.py` 文件一样,以下是每个工作进程都将运行的 `main.py`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "BcsuBYrpgnlS", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%writefile main.py\n", "\n", "import os\n", "import json\n", "\n", "import tensorflow as tf\n", "import mnist_setup\n", "\n", "per_worker_batch_size = 64\n", "tf_config = json.loads(os.environ['TF_CONFIG'])\n", "num_workers = len(tf_config['cluster']['worker'])\n", "\n", "strategy = tf.distribute.MultiWorkerMirroredStrategy()\n", "\n", "global_batch_size = per_worker_batch_size * num_workers\n", "multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)\n", "\n", "with strategy.scope():\n", " # Model building/compiling need to be within `strategy.scope()`.\n", " multi_worker_model = mnist_setup.build_and_compile_cnn_model()\n", "\n", "\n", "multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)" ] }, { "cell_type": "markdown", "metadata": { "id": "Aom9xelvJQ_6" }, "source": [ "在上面的代码段中,请注意传递给 `Dataset.batch` 的 `global_batch_size` 设置为 `per_worker_batch_size * num_workers`。这可以确保每个工作进程均处理若干批次的 `per_worker_batch_size` 样本,而不受工作进程数量影响。" ] }, { "cell_type": "markdown", "metadata": { "id": "lHLhOii67Saa" }, "source": [ "当前目录现包含两个 Python 文件:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "bi6x05Sr60O9", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "ls *.py" ] }, { "cell_type": "markdown", "metadata": { "id": "qmEEStPS6vR_" }, "source": [ "将 `TF_CONFIG` 序列化为 JSON 并将其添加到环境变量中:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9uu3g7vV7Bbt", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ['TF_CONFIG'] = json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "MsY3dQLK7jdf" }, "source": [ "现在,您可以启动一个将运行 `main.py` 并使用 `TF_CONFIG` 的工作进程:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "txMXaq8d8N_S", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# first kill any previous runs\n", "%killbgscripts" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "qnSma_Ck7r-r", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash --bg\n", "python main.py &> job_0.log" ] }, { "cell_type": "markdown", "metadata": { "id": "ZChyazqS7v0P" }, "source": [ "以上命令有几点需要注意:\n", "\n", "1. 它使用 `%%bash`,这是一项用于运行一些 bash 命令的[笔记本“魔术命令”](https://ipython.readthedocs.io/en/stable/interactive/magics.html)。\n", "2. 它使用 `--bg` 标志在后台运行 `bash` 进程,因为此工作进程不会终止。它在开始之前会等待所有工作进程。\n", "\n", "后台工作进程不会将输出打印到此笔记本,因此 `&>` 会将其输出重定向到一个文件,以便您稍后在日志文件中查看所发生的情况。\n", "\n", "那么,请等待几秒钟以启动该进程:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Hm2yrULE9281", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import time\n", "time.sleep(10)" ] }, { "cell_type": "markdown", "metadata": { "id": "ZFPoNxg_9_Mx" }, "source": [ "现在,查看一下目前为止输出到工作进程日志文件的内容:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vZEOuVgQ9-hn", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "cat job_0.log" ] }, { "cell_type": "markdown", "metadata": { "id": "RqZhVF7L_KOy" }, "source": [ "日志文件的最后一行内容应为:`Started server with target: grpc://localhost:12345`。第一个工作进程现已准备就绪,并等待所有其他工作进程继续。" ] }, { "cell_type": "markdown", "metadata": { "id": "Pi8vPNNA_l4a" }, "source": [ "随后,更新 `tf_config` 以供第二个工作进程取用:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "lAiYkkPu_Jqd", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "tf_config['task']['index'] = 1\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "0AshGVO0_x0w" }, "source": [ "启动第二个工作进程。这将开始训练,因为所有工作进程都已处于活动状态(因此无需在后台执行此进程):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "_ESVtyQ9_xjx", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "python main.py" ] }, { "cell_type": "markdown", "metadata": { "id": "hX4FA2O2AuAn" }, "source": [ "如果您重新检查第一个工作进程编写的日志,您会看到它参与了该模型的训练:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rc6hw3yTBKXX", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "cat job_0.log" ] }, { "cell_type": "markdown", "metadata": { "id": "zL79ak5PMzEg" }, "source": [ "注:这可能要慢于本教程开头的测试运行,因为在单台计算机上运行多个工作进程只会增加开销。这里的目标并非提高训练速度,而是为了提供一个多工作进程训练的示例。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "sG5_1UgrgniF", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.\n", "os.environ.pop('TF_CONFIG', None)\n", "%killbgscripts" ] }, { "cell_type": "markdown", "metadata": { "id": "9j2FJVHoUIrE" }, "source": [ "## 深入了解多工作进程训练\n" ] }, { "cell_type": "markdown", "metadata": { "id": "C1hBks_dAZmT" }, "source": [ "到目前为止,您已经学习了如何执行基本的多工作进程设置。本教程的其余部分详细介绍了对实际用例可能有用或重要的其他因素。" ] }, { "cell_type": "markdown", "metadata": { "id": "Rr14Vl9GR4zq" }, "source": [ "### 数据集分片和批(batch)大小\n", "\n", "在多工作器训练中,需要将数据分片为多个部分,以确保融合和性能。 但是,请注意,在上面的代码片段中,数据集直接发送到`model.fit()`,而无需分片; 这是因为`tf.distribute.Strategy` API在多工作器训练中会自动处理数据集分片。\n", "\n", "前一部分中的示例依赖于 `tf.distribute.Strategy` API 提供的默认自动分片功能。您可以通过设置 `tf.data.experimental.DistributeOptions` 的 `tf.data.experimental.AutoShardPolicy` 控制分片。\n", "\n", "要详细了解*自动分片*,请参阅[分布式输入指南](https://tensorflow.google.cn/tutorials/distribute/input#sharding)。\n", "\n", "以下是如何关闭自动分片的快速示例,以便使每个副本都会处理每个样本(*不推荐*):\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "JxEtdh1vH-TF", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "options = tf.data.Options()\n", "options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF\n", "\n", "global_batch_size = 64\n", "multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)\n", "dataset_no_auto_shard = multi_worker_dataset.with_options(options)" ] }, { "cell_type": "markdown", "metadata": { "id": "z85hElxsBQsT" }, "source": [ "### 评估" ] }, { "cell_type": "markdown", "metadata": { "id": "gmqvlh5LhAoU" }, "source": [ "如果您还将 `validation_data` 传递给 `Model.fit`,它将在每个周期的训练和评估之间交替。评估作业分布在同一组工作进程中,其结果会被聚合并对所有工作进程可用。\n", "\n", "与训练类似,验证数据集在文件级别自动分片。您需要在验证数据集中设置全局批次大小并设置 `validation_steps`。\n", "\n", "建议使用重复数据集(通过调用 `tf.data.Dataset.repeat`)进行评估。\n", "\n", "或者,您也可以创建另一个任务来定期读取检查点并运行评估。这就是 Estimator 的工作。但这并不是执行评估的推荐方式,因此不做赘述。" ] }, { "cell_type": "markdown", "metadata": { "id": "FNkoxUPJBNTb" }, "source": [ "### 性能" ] }, { "cell_type": "markdown", "metadata": { "id": "XVk4ftYx6JAO" }, "source": [ "要调整多工作进程训练的性能,您可以尝试以下操作:\n", "\n", "- `tf.distribute.MultiWorkerMirroredStrategy` 提供了多种[集合通信实现](https://tensorflow.google.cn/api_docs/python/tf/distribute/experimental/CommunicationImplementation):\n", "\n", " - `RING` 使用 gRPC 作为跨主机通信层实现基于环的集合。\n", " - `NCCL` 使用 [NVIDIA Collective Communication Library](https://developer.nvidia.com/nccl) 实现集合。\n", " - `AUTO` 将选择推迟到运行时。\n", "\n", " 集合实现的最佳选择取决于 GPU 的数量、类型和集群中的网络互连。要重写自动选择,请指定 `MultiWorkerMirroredStrategy` 的构造函数的 `communication_options` 参数。例如:\n", "\n", " ```python\n", " communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)\n", " ```\n", "\n", "- 如果可能的话,将变量强制转换为 `tf.float`。\n", "\n", " - 官方 ResNet 模型包括如何完成此操作的[示例](https://github.com/tensorflow/models/blob/8367cf6dabe11adf7628541706b660821f397dce/official/resnet/resnet_model.py#L466)。" ] }, { "cell_type": "markdown", "metadata": { "id": "97WhAu8uKw3j" }, "source": [ "### 容错\n", "\n", "在同步训练中,如果其中一个工作进程出现故障并且不存在故障恢复机制,则集群将失败。\n", "\n", "在工作进程退出或不稳定的情况下,将 Keras 与 `tf.distribute.Strategy` 结合使用会具有容错的优势。您可以通过在您选择的分布式文件系统中保留训练状态来做到这一点,以便在重新启动先前失败或被抢占的实例后,将恢复训练状态。\n", "\n", "当一个工作进程不可用时,其他工作进程将失败(可能先发生超时)。在这种情况下,需要重新启动不可用的工作进程以及其他失败的工作进程。\n", "\n", "注:之前,`ModelCheckpoint` 回调提供了一种在从多工作进程训练的作业失败重启时恢复训练状态的机制。TensorFlow 团队引入了一个新的 [`BackupAndRestore`](#scrollTo=kmH8uCUhfn4w) 回调,这也添加了对单个工作进程训练的支持以获得一致的体验,并从现有 `ModelCheckpoint` 回调中移除了容错功能。从现在开始,依赖于此行为的应用应迁移到新 `BackupAndRestore` 回调。" ] }, { "cell_type": "markdown", "metadata": { "id": "KvHPjGlyyFt6" }, "source": [ "#### `ModelCheckpoint` 回调\n", "\n", "`ModelCheckpoint` 回调不再提供容错功能,请改用 [`BackupAndRestore`](#scrollTo=kmH8uCUhfn4w) 回调。\n", "\n", "`ModelCheckpoint` 回调仍可用于保存检查点。但使用此回调时,当训练中断或成功完成后,如果要继续从检查点进行训练,用户必须手动加载模型。\n", "\n", "另外,用户也可以选择在 `ModelCheckpoint` 回调之外保存和恢复模型/权重。" ] }, { "cell_type": "markdown", "metadata": { "id": "EUNV5Utc1d0s" }, "source": [ "### 模型保存和加载\n", "\n", "要使用 `model.save` 或 `tf.saved_model.save` 保存模型,每个工作进程需具有不同的保存目标。\n", "\n", "- 对于非首席工作进程,您需要将模型保存到临时目录。\n", "- 对于首席工作进程,您需要保存到提供的模型目录。\n", "\n", "工作进程上的临时目录必须唯一,以防止多个工作进程尝试写入同一位置而导致错误。\n", "\n", "所有目录中保存的模型都是相同的,通常只需要引用首席工作进程保存的模型进行恢复或应用。\n", "\n", "您应该有一些清理逻辑,可以在训练完成后删除工作进程创建的临时目录。\n", "\n", "需要同时在首席工作进程和其他工作进程上保存的原因是您可能会在读取检查点时聚合变量,这需要首席工作进程和其他工作进程都参与 AllReduce 通信协议。另一方面,让首席工作进程和其他工作进程保存到同一个模型目录中会因为争用路径而导致错误。\n", "\n", "通过使用 `MultiWorkerMirroredStrategy`,程序会在每个工作进程上运行,它利用了具有 `task_type` 和 `task_id` 特性的集群解析器对象来确定当前的工作进程是否为首席工作进程:\n", "\n", "- `task_type` 会告诉您当前的作业是什么(例如 `'worker'`)。\n", "- `task_id` 会告诉您工作进程的标识符。\n", "- `task_id == 0` 的工作进程会被指定为首席工作进程。\n", "\n", "在下面的代码段中,`write_filepath` 函数提供了要写入的文件路径,这取决于工作进程的 `task_id`:\n", "\n", "- 对于首席工作进程 (`task_id == 0`),它会写入原始文件路径。\n", "- 对于其他工作进程,它会创建一个临时目录 `temp_dir`,并在目录路径中使用 `task_id` 来写入:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "XQfGkmg-pfCY", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "model_path = '/tmp/keras-model'\n", "\n", "def _is_chief(task_type, task_id):\n", " # Note: there are two possible `TF_CONFIG` configurations.\n", " # 1) In addition to `worker` tasks, a `chief` task type is use;\n", " # in this case, this function should be modified to\n", " # `return task_type == 'chief'`.\n", " # 2) Only `worker` task type is used; in this case, worker 0 is\n", " # regarded as the chief. The implementation demonstrated here\n", " # is for this case.\n", " # For the purpose of this Colab section, the `task_type` is `None` case\n", " # is added because it is effectively run with only a single worker.\n", " return (task_type == 'worker' and task_id == 0) or task_type is None\n", "\n", "def _get_temp_dir(dirpath, task_id):\n", " base_dirpath = 'workertemp_' + str(task_id)\n", " temp_dir = os.path.join(dirpath, base_dirpath)\n", " tf.io.gfile.makedirs(temp_dir)\n", " return temp_dir\n", "\n", "def write_filepath(filepath, task_type, task_id):\n", " dirpath = os.path.dirname(filepath)\n", " base = os.path.basename(filepath)\n", " if not _is_chief(task_type, task_id):\n", " dirpath = _get_temp_dir(dirpath, task_id)\n", " return os.path.join(dirpath, base)\n", "\n", "task_type, task_id = (strategy.cluster_resolver.task_type,\n", " strategy.cluster_resolver.task_id)\n", "write_model_path = write_filepath(model_path, task_type, task_id)" ] }, { "cell_type": "markdown", "metadata": { "id": "hs0_agYR_qKm" }, "source": [ "随后,您就可以保存了:" ] }, { "cell_type": "markdown", "metadata": { "id": "XnToxeIcg_6O" }, "source": [ "已弃用:对于 Keras 对象,建议使用新的高级 `.keras` 格式和 `tf.keras.Model.export`,如[此处](https://tensorflow.google.cn/guide/keras/save_and_serialize)的指南所示。对于现有代码,继续支持低级 SavedModel 格式。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "J-yA3BYG_vTs", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "multi_worker_model.save(write_model_path)" ] }, { "cell_type": "markdown", "metadata": { "id": "8LXUVVl9_v5x" }, "source": [ "如上所述,稍后应仅从保存首席工作进程的文件路径中加载模型。因此,请移除保存非首席工作进程的临时路径:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "aJTyu-97ABpY", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "if not _is_chief(task_type, task_id):\n", " tf.io.gfile.rmtree(os.path.dirname(write_model_path))" ] }, { "cell_type": "markdown", "metadata": { "id": "Nr-2PKlHAPBT" }, "source": [ "接下来,当需要加载时,使用方便的 `tf.keras.models.load_model` API,并继续进一步的工作。\n", "\n", "在这里,假设仅使用单个工作进程加载并继续训练,在这种情况下,您不会在另一个 `strategy.scope()` 中调用 `tf.keras.models.load_model`(请注意,如前面所定义,`strategy = tf.distribute.MultiWorkerMirroredStrategy()`):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iUZna-JKAOrX", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "loaded_model = tf.keras.models.load_model(model_path)\n", "\n", "# Now that the model is restored, and can continue with the training.\n", "loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)" ] }, { "cell_type": "markdown", "metadata": { "id": "YJ1fmxmTpocS" }, "source": [ "### 检查点保存和恢复\n", "\n", "另一方面,您可以使用检查点保存并恢复模型的权重,而无需保存整个模型。\n", "\n", "在这里,您将创建一个由 `tf.train.CheckpointManager` 管理的跟踪模型的 `tf.train.Checkpoint`,以便仅保留最新的检查点:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "_1-RYaB5xnNH", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "checkpoint_dir = '/tmp/ckpt'\n", "\n", "checkpoint = tf.train.Checkpoint(model=multi_worker_model)\n", "write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)\n", "checkpoint_manager = tf.train.CheckpointManager(\n", " checkpoint, directory=write_checkpoint_dir, max_to_keep=1)" ] }, { "cell_type": "markdown", "metadata": { "id": "7oBpPCRsW1MF" }, "source": [ "`CheckpointManager` 设置完成后,您就可以保存并移除非首席工作进程保存的检查点了:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "l1ZXG_GbWzLp", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "checkpoint_manager.save()\n", "if not _is_chief(task_type, task_id):\n", " tf.io.gfile.rmtree(write_checkpoint_dir)" ] }, { "cell_type": "markdown", "metadata": { "id": "RO7cbN40XD5v" }, "source": [ "现在,当需要恢复模型时,您可以使用方便的 `tf.train.latest_checkpoint` 函数找到保存的最新检查点。恢复检查点后,您可以继续进行训练。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NJW7vtknXFEH", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)\n", "checkpoint.restore(latest_checkpoint)\n", "multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)" ] }, { "cell_type": "markdown", "metadata": { "id": "kmH8uCUhfn4w" }, "source": [ "#### `BackupAndRestore` 回调\n", "\n", "`tf.keras.callbacks.BackupAndRestore` 回调可通过在 `BackupAndRestore` 的 `backup_dir` 参数下的临时检查点文件中备份模型和当前训练状态来提供容错功能。\n", "\n", "注:在 TensorFlow 2.9 中,当前模型和训练状态在周期边界处备份。在 `tf-nightly` 版和 TensorFlow 2.10 中,`BackupAndRestore` 回调可以在周期或步骤边界备份模型和训练状态。`BackupAndRestore` 接受可选的 `save_freq` 参数。`save_freq` 接受 `'epoch'` 或 `int` 值。如果 `save_freq` 设置为 `'epoch'`,则模型会在每个周期后备份。如果 `save_freq` 设置为大于 `0` 的整数值,则在每 `save_freq` 个批次后备份模型。\n", "\n", "作业中断并重新启动后,`BackupAndRestore` 回调将恢复上一个检查点,您可以从周期的开始和上次保存训练状态的步骤继续训练。\n", "\n", "要使用该回调,请在 `Model.fit` 调用中提供 `tf.keras.callbacks.BackupAndRestore` 的实例。\n", "\n", "使用 `MultiWorkerMirroredStrategy` 时,如果一个工作进程被中断,则整个集群都会暂停,直到被中断的工作进程重新启动为止。其他工作进程也会重新启动,且中断的工作进程将重新加入集群。然后,每个工作进程都会读取先前保存的检查点文件并获取其以前的状态,从而使集群恢复同步。然后即可继续训练。分布式数据集迭代器状态将重新初始化,而不会恢复。\n", "\n", "The `BackupAndRestore` callback uses the `CheckpointManager` to save and restore the training state, which generates a file called checkpoint that tracks existing checkpoints together with the latest one. For this reason, `backup_dir` should not be re-used to store other checkpoints in order to avoid name collision.\n", "\n", "目前,`BackupAndRestore` 回调支持无策略 (`MirroredStrategy`) 的单工作进程训练和采用 `MultiWorkerMirroredStrategy` 的多工作进程训练。\n", "\n", "下面是两个多工作进程训练和单工作进程训练的示例:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CYdzZi4Qs1jz", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Multi-worker training with `MultiWorkerMirroredStrategy`\n", "# and the `BackupAndRestore` callback. The training state \n", "# is backed up at epoch boundaries by default.\n", "\n", "callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]\n", "with strategy.scope():\n", " multi_worker_model = mnist_setup.build_and_compile_cnn_model()\n", "multi_worker_model.fit(multi_worker_dataset,\n", " epochs=3,\n", " steps_per_epoch=70,\n", " callbacks=callbacks)" ] }, { "cell_type": "markdown", "metadata": { "id": "f8e86TAp0Rsl" }, "source": [ "如果 `BackupAndRestore` 回调中的 `save_freq` 参数设置为 `'epoch'`,则模型会在每个周期后备份。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rZjQGPsF0aEI", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# The training state is backed up at epoch boundaries because `save_freq` is\n", "# set to `epoch`.\n", "\n", "callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]\n", "with strategy.scope():\n", " multi_worker_model = mnist_setup.build_and_compile_cnn_model()\n", "multi_worker_model.fit(multi_worker_dataset,\n", " epochs=3,\n", " steps_per_epoch=70,\n", " callbacks=callbacks)\n" ] }, { "cell_type": "markdown", "metadata": { "id": "p-r44kCM0jc6" }, "source": [ "注:下一个代码块使用了仅在 Tensorflow 2.10 发布后才能在 `tf-nightly` 中可用的功能。\n", "\n", "如果 `BackupAndRestore` 回调中的 `save_freq` 参数设置为大于 `0` 的整数值,则在每 `save_freq` 个批次后备份模型。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "bSJUyLSF0moC", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# The training state is backed up at every 30 steps because `save_freq` is set\n", "# to an integer value of `30`.\n", "\n", "callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup', save_freq=30)]\n", "with strategy.scope():\n", " multi_worker_model = mnist_setup.build_and_compile_cnn_model()\n", "multi_worker_model.fit(multi_worker_dataset,\n", " epochs=3,\n", " steps_per_epoch=70,\n", " callbacks=callbacks)" ] }, { "cell_type": "markdown", "metadata": { "id": "rIV5_3ebzXmB" }, "source": [ "检查您在 `BackupAndRestore` 中指定的 `backup_dir` 目录时,您可能会注意到一些临时生成的检查点文件。在恢复之前丢失的实例时需要用到这些文件,而在成功退出训练后,它们将在 `Model.fit` 结束时被库移除。\n", "\n", "注:目前 `BackupAndRestore` 回调仅支持 Eager 模式。在图形模式下,考虑将 `Model.save`/`tf.saved_model.save` 和 `tf.keras.models.load_model` 分别用于保存和恢复模型,如上面的*模型保存和加载*部分所述,并在训练期间在 `Model.fit` 中提供 `initial_epoch`。" ] }, { "cell_type": "markdown", "metadata": { "id": "ega2hdOQEmy_" }, "source": [ "## 其他资源\n", "\n", "1. [TensorFlow 中的分布式训练](../../guide/distributed_training.ipynb)指南概述了可用的分布式策略。\n", "2. [使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环](multi_worker_with_ctl.ipynb)教程展示了如何将 `MultiWorkerMirroredStrategy` 与 Keras 和自定义训练循环一起使用。\n", "3. 查看[官方模型](https://github.com/tensorflow/models/tree/master/official),其中许多模型可以配置为运行多个分布式策略。\n", "4. [使用 tf.function 提升性能](../../guide/function.ipynb)指南提供了有关其他策略和工具的信息,例如可用于优化 TensorFlow 模型性能的 [TensorFlow Profiler](../../guide/profiler.md)。" ] } ], "metadata": { "colab": { "name": "multi_worker_with_keras.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }