{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "Tce3stUlHN0L" }, "outputs": [], "source": [ "##### Copyright 2019 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "tuOe1ymfHZPu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "MfBg1C5NB3X0" }, "source": [ "# 使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看\n", " 在 Google Colab 运行\n", " 在 GitHub 上查看源代码\n", " 下载笔记本\n", "
" ] }, { "cell_type": "markdown", "metadata": { "id": "xHxb-dlhMIzW" }, "source": [ "## 概述\n", "\n", "本教程演示如何使用 Keras 模型和 tf.distribute.Strategy API 的自定义训练循环执行多工作进程分布式训练。训练循环通过 `tf.distribute.MultiWorkerMirroredStrategy` 进行分布。这样,设计为在[单个工作进程上运行的 `tf.keras` 模型](custom_training.ipynb)即可通过最少的代码更改无缝地在多个工作进程上运行。自定义训练循环提供了灵活性和更好的训练控制,同时也使模型的调试更加容易。请详细了解有关[编写基本训练循环](../../guide/basic_training_loops.ipynb)、 [从头开始编写训练循环](https://tensorflow.google.cn/guide/keras/writing_a_training_loop_from_scratch)和[自定义训练](../customization/custom_training_walkthrough.ipynb)的信息。\n", "\n", "如果您正在寻找如何将 `MultiWorkerMirroredStrategy` 与 `tf.keras.Model.fit` 一起使用,请参阅此[教程](multi_worker_with_keras.ipynb)。\n", "\n", "[TensorFlow 中的分布式训练](../../guide/distributed_training.ipynb)指南概述了 TensorFlow 支持的分布式策略,并适用于想要更深入了解 `tf.distribute.Strategy` API 的人。" ] }, { "cell_type": "markdown", "metadata": { "id": "MUXex9ctTuDB" }, "source": [ "## 安装\n", "\n", "首先,进行一些必要的导入。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "bnYxvfLD-LW-", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import json\n", "import os\n", "import sys" ] }, { "cell_type": "markdown", "metadata": { "id": "Zz0EY91y3mxy" }, "source": [ "在导入 TensorFlow 之前,需要对环境进行一些变更:\n", "\n", "- 停用所有 GPU。这可以防止因所有工作进程都尝试使用同一个 GPU 而导致的错误。对于真实应用,每个工作进程都将在不同的计算机上运行。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "685pbYEY3jGC", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ[\"CUDA_VISIBLE_DEVICES\"] = \"-1\"" ] }, { "cell_type": "markdown", "metadata": { "id": "7X1MS6385BWi" }, "source": [ "- 重置 `'TF_CONFIG'` 环境变量(稍后您将看到更多相关信息)。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "WEJLYa2_7OZF", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ.pop('TF_CONFIG', None)" ] }, { "cell_type": "markdown", "metadata": { "id": "Rd4L9Ii77SS8" }, "source": [ "- 确保当前目录位于 Python 的路径上。这样,笔记本可以导入稍后由 `%%writefile` 写入的文件。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hPBuZUNSZmrQ", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "if '.' not in sys.path:\n", " sys.path.insert(0, '.')" ] }, { "cell_type": "markdown", "metadata": { "id": "pDhHuMjb7bfU" }, "source": [ "现在导入 TensorFlow。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vHNvttzV43sA", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import tensorflow as tf" ] }, { "cell_type": "markdown", "metadata": { "id": "0S2jpf6Sx50i" }, "source": [ "### 数据集和模型定义" ] }, { "cell_type": "markdown", "metadata": { "id": "fLW6D2TzvC-4" }, "source": [ "接下来,使用简单的模型和数据集设置创建 `mnist.py` 文件。本教程中的工作进程将使用此 Python 文件:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dma_wUAxZqo2", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%writefile mnist.py\n", "\n", "import os\n", "import tensorflow as tf\n", "import numpy as np\n", "\n", "def mnist_dataset(batch_size):\n", " (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()\n", " # The `x` arrays are in uint8 and have values in the range [0, 255].\n", " # You need to convert them to float32 with values in the range [0, 1]\n", " x_train = x_train / np.float32(255)\n", " y_train = y_train.astype(np.int64)\n", " train_dataset = tf.data.Dataset.from_tensor_slices(\n", " (x_train, y_train)).shuffle(60000)\n", " return train_dataset\n", "\n", "def dataset_fn(global_batch_size, input_context):\n", " batch_size = input_context.get_per_replica_batch_size(global_batch_size)\n", " dataset = mnist_dataset(batch_size)\n", " dataset = dataset.shard(input_context.num_input_pipelines,\n", " input_context.input_pipeline_id)\n", " dataset = dataset.batch(batch_size)\n", " return dataset\n", "\n", "def build_cnn_model():\n", " regularizer = tf.keras.regularizers.L2(1e-5)\n", " return tf.keras.Sequential([\n", " tf.keras.Input(shape=(28, 28)),\n", " tf.keras.layers.Reshape(target_shape=(28, 28, 1)),\n", " tf.keras.layers.Conv2D(32, 3,\n", " activation='relu',\n", " kernel_regularizer=regularizer),\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(128,\n", " activation='relu',\n", " kernel_regularizer=regularizer),\n", " tf.keras.layers.Dense(10, kernel_regularizer=regularizer)\n", " ])" ] }, { "cell_type": "markdown", "metadata": { "id": "JmgZwwymxqt5" }, "source": [ "## 多工作进程配置\n", "\n", "接下来,我们进入多工作进程训练的世界。在 TensorFlow 中,在多台机器上进行训练需要 `'TF_CONFIG'` 环境变量。每台机器可能有不同的角色。下面使用的 `'TF_CONFIG'` 变量是一个 JSON 字符串,它指定集群中每个工作进程的集群配置。这是使用 `cluster_resolver.TFConfigClusterResolver` 指定集群的默认方法,但在 `distribute.cluster_resolver` 模块中还有其他可用选项。请在[分布式训练指南](../../guide/distributed_training.ipynb)中详细了解有关设置 `'TF_CONFIG'` 变量的信息。" ] }, { "cell_type": "markdown", "metadata": { "id": "SS8WhvRhe_Ya" }, "source": [ "### 描述您的集群\n", "\n", "下面是一个示例配置:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "XK1eTYvSZiX7", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "tf_config = {\n", " 'cluster': {\n", " 'worker': ['localhost:12345', 'localhost:23456']\n", " },\n", " 'task': {'type': 'worker', 'index': 0}\n", "}" ] }, { "cell_type": "markdown", "metadata": { "id": "JjgwJbPKZkJL" }, "source": [ "请注意,`tf_config` 只是 Python 中的局部变量。要将其用于训练配置,请将其序列化为 JSON 并将其放在 `'TF_CONFIG'` 环境变量中。这是序列化为 JSON 字符串的相同 `'TF_CONFIG'`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "yY-T0YDQZjbu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "AUBmYRZqxthH" }, "source": [ "`'TF_CONFIG'` 有两个组件:`'cluster'` 和 `'task'`。\n", "\n", "- `'cluster'` 对所有工作进程都相同,并提供有关训练集群的信息,这是一个由不同类型的作业组成的字典,例如 `'worker'` 。在使用 `MultiWorkerMirroredStrategy` 进行的多工作进程训练中,除了普通的 `'worker'` 之外,通常还有一个 `'worker'` 承担更多的责任,例如保存检查点和为 TensorBoard 编写摘要文件。这样的工作进程被称为 `'chief'` 工作进程,习惯上将 `'index'` 为 0 的 `'worker'` 指定为首席 `worker`。\n", "\n", "- `'task'` 提供当前任务的信息,并且在每个工作进程上都不相同。它指定该工作进程的 `'type'` 和 `'index'`。" ] }, { "cell_type": "markdown", "metadata": { "id": "8YFpxrcsZ2xG" }, "source": [ "在本例中,您会将任务 `'type'` 设置为 `'worker'`,将任务 `'index'` 设置为 `0`。这台机器是首个工作进程,将被指定为首席工作进程,并需要比其他工作进程承担更多的工作。请注意,其他机器也需要设置 `'TF_CONFIG'` 环境变量,且应该具有相同的 `'cluster'` 字典,但要根据这些机器的具体角色来设置不同的任务 `'type'` 或任务 `'index'`。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "aogb74kHxynz" }, "source": [ "出于演示的目的,本教程将展示如何在 `'localhost'` 上设置具有两个工作进程的 `'TF_CONFIG'`。在实践中,用户会在外部 IP 地址/端口上创建多个工作进程,并为每个工作进程正确设置 `'TF_CONFIG'`。\n", "\n", "本示例使用两个工作进程,第一个工作进程的 `'TF_CONFIG'` 如上所示。对于第二个工作进程,设置 `tf_config['task']['index']=1`。" ] }, { "cell_type": "markdown", "metadata": { "id": "cIlkfWmjz1PG" }, "source": [ "### 笔记本中的环境变量和子进程" ] }, { "cell_type": "markdown", "metadata": { "id": "FcjAbuGY1ACJ" }, "source": [ "子进程会从其父进程继承环境变量。因此,如果您在此 Jupyter Notebook 进程中设置环境变量:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PH2gHn2_0_U8", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ['GREETINGS'] = 'Hello TensorFlow!'" ] }, { "cell_type": "markdown", "metadata": { "id": "gQkIX-cg18md" }, "source": [ "然后,您可以从子进程访问环境变量:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pquKO6IA18G5", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "echo ${GREETINGS}" ] }, { "cell_type": "markdown", "metadata": { "id": "af6BCA-Y2fpz" }, "source": [ "在下一部分中,您将使用它来将 `'TF_CONFIG'` 传递给工作进程子进程。实际上,您永远不会以这种方式启动您的作业,但这完全可以满足此教程的演示目的:呈现最简单的多工作进程示例。" ] }, { "cell_type": "markdown", "metadata": { "id": "UhNtHfuxCGVy" }, "source": [ "## MultiWorkerMirroredStrategy\n", "\n", "在训练模型之前,首先创建一个 `tf.distribute.MultiWorkerMirroredStrategy` 的实例:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1uFSHCJXMrQ-", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "strategy = tf.distribute.MultiWorkerMirroredStrategy()" ] }, { "cell_type": "markdown", "metadata": { "id": "N0iv7SyyAohc" }, "source": [ "注:在您调用 `tf.distribute.MultiWorkerMirroredStrategy` 时,会解析 `'TF_CONFIG'` 并启动 TensorFlow 的 GRPC 服务器。因此,您必须在实例化 `tf.distribute.Strategy` 之前设置 `'TF_CONFIG'` 环境变量。为了在这个说明性示例中节省时间,本教程中没有对此进行演示,因此不需要启动服务器。您可以在本教程的最后一个部分中找到完整的示例。" ] }, { "cell_type": "markdown", "metadata": { "id": "TS4S-faBHHam" }, "source": [ "使用 `tf.distribute.Strategy.scope` 指定构建模型时应使用的策略。这使得该策略可以控制变量放置之类的事情,它将在所有工作进程的每个设备上,在模型的层中创建所有变量的副本。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "nXV49tG1_opc", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import mnist\n", "with strategy.scope():\n", " # Model building needs to be within `strategy.scope()`.\n", " multi_worker_model = mnist.build_cnn_model()" ] }, { "cell_type": "markdown", "metadata": { "id": "DSYkM-on6r3Y" }, "source": [ "## 在工作进程之间对数据进行自动分片\n", "\n", "在多工作进程训练中,需要通过*数据集分片*来确保收敛性和可重复性。分片意味着将整个数据集的一个子集交给每个工作进程,这有助于创造类似于对单个工作进程进行训练的体验。在下面的示例中,您依赖于 `tf.distribute` 的默认自动分片策略。您还可以通过设置 `tf.data.experimental.DistributeOptions` 的 `tf.data.experimental.AutoShardPolicy` 来对其进行自定义。要了解更多信息,请参阅[分布式输入教程](input.ipynb)的*分片*部分。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "65-p36pt6rUF", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "per_worker_batch_size = 64\n", "num_workers = len(tf_config['cluster']['worker'])\n", "global_batch_size = per_worker_batch_size * num_workers\n", "\n", "with strategy.scope():\n", " multi_worker_dataset = strategy.distribute_datasets_from_function(\n", " lambda input_context: mnist.dataset_fn(global_batch_size, input_context))" ] }, { "cell_type": "markdown", "metadata": { "id": "rkNzSR3g60iP" }, "source": [ "## 定义自定义训练循环并训练模型\n", "\n", "指定优化器:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NoMr4_zTeKSn", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "with strategy.scope():\n", " # The creation of optimizer and train_accuracy needs to be in\n", " # `strategy.scope()` as well, since they create variables.\n", " optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)\n", " train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(\n", " name='train_accuracy')" ] }, { "cell_type": "markdown", "metadata": { "id": "RmrDcAii4B5O" }, "source": [ "使用 `tf.function` 定义训练步骤:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "znXWN5S3eUDB", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "@tf.function\n", "def train_step(iterator):\n", " \"\"\"Training step function.\"\"\"\n", "\n", " def step_fn(inputs):\n", " \"\"\"Per-Replica step function.\"\"\"\n", " x, y = inputs\n", " with tf.GradientTape() as tape:\n", " predictions = multi_worker_model(x, training=True)\n", " per_example_loss = tf.keras.losses.SparseCategoricalCrossentropy(\n", " from_logits=True,\n", " reduction=tf.keras.losses.Reduction.NONE)(y, predictions)\n", " loss = tf.nn.compute_average_loss(per_example_loss)\n", " model_losses = multi_worker_model.losses\n", " if model_losses:\n", " loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))\n", "\n", " grads = tape.gradient(loss, multi_worker_model.trainable_variables)\n", " optimizer.apply_gradients(\n", " zip(grads, multi_worker_model.trainable_variables))\n", " train_accuracy.update_state(y, predictions)\n", " return loss\n", "\n", " per_replica_losses = strategy.run(step_fn, args=(next(iterator),))\n", " return strategy.reduce(\n", " tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)" ] }, { "cell_type": "markdown", "metadata": { "id": "eFXHsUVBy0Rx" }, "source": [ "### 检查点保存和恢复\n", "\n", "在编写自定义训练循环时,您需要手动处理[检查点保存](../../guide/checkpoint.ipynb),而不是依赖 Keras 回调。请注意,对于 `MultiWorkerMirroredStrategy`,保存检查点或完整模型需要所有工作进程的参与,因为尝试仅在首席工作进程上进行保存可能会导致死锁。工作进程还需要写入不同的路径以避免相互重写。以下是如何配置目录的示例:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "LcFO6x1KyjhI", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "from multiprocessing import util\n", "checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')\n", "\n", "def _is_chief(task_type, task_id, cluster_spec):\n", " return (task_type is None\n", " or task_type == 'chief'\n", " or (task_type == 'worker'\n", " and task_id == 0\n", " and \"chief\" not in cluster_spec.as_dict()))\n", "\n", "def _get_temp_dir(dirpath, task_id):\n", " base_dirpath = 'workertemp_' + str(task_id)\n", " temp_dir = os.path.join(dirpath, base_dirpath)\n", " tf.io.gfile.makedirs(temp_dir)\n", " return temp_dir\n", "\n", "def write_filepath(filepath, task_type, task_id, cluster_spec):\n", " dirpath = os.path.dirname(filepath)\n", " base = os.path.basename(filepath)\n", " if not _is_chief(task_type, task_id, cluster_spec):\n", " dirpath = _get_temp_dir(dirpath, task_id)\n", " return os.path.join(dirpath, base)" ] }, { "cell_type": "markdown", "metadata": { "id": "nrcdPHtG4ObO" }, "source": [ "创建一个跟踪模型的 `tf.train.Checkpoint`,由 `tf.train.CheckpointManager` 管理,以便仅保留最新的检查点:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4rURT2pI4aqV", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "epoch = tf.Variable(\n", " initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')\n", "step_in_epoch = tf.Variable(\n", " initial_value=tf.constant(0, dtype=tf.dtypes.int64),\n", " name='step_in_epoch')\n", "task_type, task_id = (strategy.cluster_resolver.task_type,\n", " strategy.cluster_resolver.task_id)\n", "# Normally, you don't need to manually instantiate a `ClusterSpec`, but in this\n", "# illustrative example you did not set `'TF_CONFIG'` before initializing the\n", "# strategy. Check out the next section for \"real-world\" usage.\n", "cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])\n", "\n", "checkpoint = tf.train.Checkpoint(\n", " model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)\n", "\n", "write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,\n", " cluster_spec)\n", "checkpoint_manager = tf.train.CheckpointManager(\n", " checkpoint, directory=write_checkpoint_dir, max_to_keep=1)" ] }, { "cell_type": "markdown", "metadata": { "id": "RO7cbN40XD5v" }, "source": [ "现在,当需要恢复检查点时,您可以方便地使用 `tf.train.latest_checkpoint` 函数(或通过调用 `tf.train.CheckpointManager.restore_or_initialize` )找到最新的已保存检查点。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gniynaQj6HMV", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)\n", "if latest_checkpoint:\n", " checkpoint.restore(latest_checkpoint)" ] }, { "cell_type": "markdown", "metadata": { "id": "1j9JuI-h6ObW" }, "source": [ "恢复检查点后,您可以继续训练自定义训练循环。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "kZzXZCh45FY6", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "num_epochs = 3\n", "num_steps_per_epoch = 70\n", "\n", "while epoch.numpy() < num_epochs:\n", " iterator = iter(multi_worker_dataset)\n", " total_loss = 0.0\n", " num_batches = 0\n", "\n", " while step_in_epoch.numpy() < num_steps_per_epoch:\n", " total_loss += train_step(iterator)\n", " num_batches += 1\n", " step_in_epoch.assign_add(1)\n", "\n", " train_loss = total_loss / num_batches\n", " print('Epoch: %d, accuracy: %f, train_loss: %f.'\n", " %(epoch.numpy(), train_accuracy.result(), train_loss))\n", "\n", " train_accuracy.reset_states()\n", "\n", " # Once the `CheckpointManager` is set up, you're now ready to save, and remove\n", " # the checkpoints non-chief workers saved.\n", " checkpoint_manager.save()\n", " if not _is_chief(task_type, task_id, cluster_spec):\n", " tf.io.gfile.rmtree(write_checkpoint_dir)\n", "\n", " epoch.assign_add(1)\n", " step_in_epoch.assign(0)" ] }, { "cell_type": "markdown", "metadata": { "id": "0W1Osks466DE" }, "source": [ "## 完整代码一览" ] }, { "cell_type": "markdown", "metadata": { "id": "jfYpmIxO6Jck" }, "source": [ "总结一下到目前为止讨论的所有程序:\n", "\n", "1. 创建工作进程。\n", "2. 将 `'TF_CONFIG'` 传递给工作进程。\n", "3. 让每个工作进程运行下面包含训练代码的脚本。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "MIDCESkVzN6M", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%writefile main.py\n", "#@title File: `main.py`\n", "import os\n", "import json\n", "import tensorflow as tf\n", "import mnist\n", "from multiprocessing import util\n", "\n", "per_worker_batch_size = 64\n", "tf_config = json.loads(os.environ['TF_CONFIG'])\n", "num_workers = len(tf_config['cluster']['worker'])\n", "global_batch_size = per_worker_batch_size * num_workers\n", "\n", "num_epochs = 3\n", "num_steps_per_epoch=70\n", "\n", "# Checkpoint saving and restoring\n", "def _is_chief(task_type, task_id, cluster_spec):\n", " return (task_type is None\n", " or task_type == 'chief'\n", " or (task_type == 'worker'\n", " and task_id == 0\n", " and 'chief' not in cluster_spec.as_dict()))\n", "\n", "def _get_temp_dir(dirpath, task_id):\n", " base_dirpath = 'workertemp_' + str(task_id)\n", " temp_dir = os.path.join(dirpath, base_dirpath)\n", " tf.io.gfile.makedirs(temp_dir)\n", " return temp_dir\n", "\n", "def write_filepath(filepath, task_type, task_id, cluster_spec):\n", " dirpath = os.path.dirname(filepath)\n", " base = os.path.basename(filepath)\n", " if not _is_chief(task_type, task_id, cluster_spec):\n", " dirpath = _get_temp_dir(dirpath, task_id)\n", " return os.path.join(dirpath, base)\n", "\n", "checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')\n", "\n", "# Define Strategy\n", "strategy = tf.distribute.MultiWorkerMirroredStrategy()\n", "\n", "with strategy.scope():\n", " # Model building/compiling need to be within `tf.distribute.Strategy.scope`.\n", " multi_worker_model = mnist.build_cnn_model()\n", "\n", " multi_worker_dataset = strategy.distribute_datasets_from_function(\n", " lambda input_context: mnist.dataset_fn(global_batch_size, input_context))\n", " optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)\n", " train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(\n", " name='train_accuracy')\n", "\n", "@tf.function\n", "def train_step(iterator):\n", " \"\"\"Training step function.\"\"\"\n", "\n", " def step_fn(inputs):\n", " \"\"\"Per-Replica step function.\"\"\"\n", " x, y = inputs\n", " with tf.GradientTape() as tape:\n", " predictions = multi_worker_model(x, training=True)\n", " per_example_loss = tf.keras.losses.SparseCategoricalCrossentropy(\n", " from_logits=True,\n", " reduction=tf.keras.losses.Reduction.NONE)(y, predictions)\n", " loss = tf.nn.compute_average_loss(per_example_loss)\n", " model_losses = multi_worker_model.losses\n", " if model_losses:\n", " loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))\n", "\n", " grads = tape.gradient(loss, multi_worker_model.trainable_variables)\n", " optimizer.apply_gradients(\n", " zip(grads, multi_worker_model.trainable_variables))\n", " train_accuracy.update_state(y, predictions)\n", "\n", " return loss\n", "\n", " per_replica_losses = strategy.run(step_fn, args=(next(iterator),))\n", " return strategy.reduce(\n", " tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)\n", "\n", "epoch = tf.Variable(\n", " initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')\n", "step_in_epoch = tf.Variable(\n", " initial_value=tf.constant(0, dtype=tf.dtypes.int64),\n", " name='step_in_epoch')\n", "\n", "task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,\n", " strategy.cluster_resolver.task_id,\n", " strategy.cluster_resolver.cluster_spec())\n", "\n", "checkpoint = tf.train.Checkpoint(\n", " model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)\n", "\n", "write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,\n", " cluster_spec)\n", "checkpoint_manager = tf.train.CheckpointManager(\n", " checkpoint, directory=write_checkpoint_dir, max_to_keep=1)\n", "\n", "# Restoring the checkpoint\n", "latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)\n", "if latest_checkpoint:\n", " checkpoint.restore(latest_checkpoint)\n", "\n", "# Resume our CTL training\n", "while epoch.numpy() < num_epochs:\n", " iterator = iter(multi_worker_dataset)\n", " total_loss = 0.0\n", " num_batches = 0\n", "\n", " while step_in_epoch.numpy() < num_steps_per_epoch:\n", " total_loss += train_step(iterator)\n", " num_batches += 1\n", " step_in_epoch.assign_add(1)\n", "\n", " train_loss = total_loss / num_batches\n", " print('Epoch: %d, accuracy: %f, train_loss: %f.'\n", " %(epoch.numpy(), train_accuracy.result(), train_loss))\n", "\n", " train_accuracy.reset_states()\n", "\n", " checkpoint_manager.save()\n", " if not _is_chief(task_type, task_id, cluster_spec):\n", " tf.io.gfile.rmtree(write_checkpoint_dir)\n", "\n", " epoch.assign_add(1)\n", " step_in_epoch.assign(0)" ] }, { "cell_type": "markdown", "metadata": { "id": "ItVOvPN1qnZ6" }, "source": [ "当前目录现包含两个 Python 文件:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "bi6x05Sr60O9", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "ls *.py" ] }, { "cell_type": "markdown", "metadata": { "id": "qmEEStPS6vR_" }, "source": [ "因此,对 `'TF_CONFIG'` 执行 JSON 序列化,然后将其添加到环境变量:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9uu3g7vV7Bbt", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "os.environ['TF_CONFIG'] = json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "MsY3dQLK7jdf" }, "source": [ "现在,您可以启动一个将运行 `main.py` 并使用 `'TF_CONFIG'` 的工作进程:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "txMXaq8d8N_S", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# first kill any previous runs\n", "%killbgscripts" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "qnSma_Ck7r-r", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash --bg\n", "python main.py &> job_0.log" ] }, { "cell_type": "markdown", "metadata": { "id": "ZChyazqS7v0P" }, "source": [ "以上命令有几点需要注意:\n", "\n", "1. 它使用 `%%bash`,这是一项用于运行一些 bash 命令的[笔记本“魔术命令”](https://ipython.readthedocs.io/en/stable/interactive/magics.html)。\n", "2. 它使用 `--bg` 标志在后台运行 `bash` 进程,因为此工作进程不会终止。它在开始之前会等待所有工作进程。\n", "\n", "后台工作进程不会将输出打印到此笔记本。`&>` 会将其输出重定向到一个文件,以便您可以查看所发生的情况。\n", "\n", "等待几秒钟以启动该进程:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Hm2yrULE9281", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import time\n", "time.sleep(20)" ] }, { "cell_type": "markdown", "metadata": { "id": "ZFPoNxg_9_Mx" }, "source": [ "接下来,检查一下目前为止输出到工作进程日志文件的内容:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vZEOuVgQ9-hn", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "cat job_0.log" ] }, { "cell_type": "markdown", "metadata": { "id": "RqZhVF7L_KOy" }, "source": [ "日志文件的最后一行内容应为:`Started server with target: grpc://localhost:12345`。第一个工作进程现已准备就绪,正在等待所有其他工作进程准备就绪以继续。" ] }, { "cell_type": "markdown", "metadata": { "id": "Pi8vPNNA_l4a" }, "source": [ "更新 `tf_config` 以供第二个工作进程取用:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "lAiYkkPu_Jqd", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "tf_config['task']['index'] = 1\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "0AshGVO0_x0w" }, "source": [ "现在,启动第二个工作进程。这将开始训练,因为所有工作进程都已处于活动状态(因此无需在后台执行此进程):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "_ESVtyQ9_xjx", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "python main.py > /dev/null 2>&1" ] }, { "cell_type": "markdown", "metadata": { "id": "hX4FA2O2AuAn" }, "source": [ "如果您重新检查第一个工作进程编写的日志,您会看到它参与了该模型的训练:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rc6hw3yTBKXX", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "%%bash\n", "cat job_0.log" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "sG5_1UgrgniF", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Delete the `'TF_CONFIG'`, and kill any background tasks so they don't affect the next section.\n", "os.environ.pop('TF_CONFIG', None)\n", "%killbgscripts" ] }, { "cell_type": "markdown", "metadata": { "id": "bhxMXa0AaZkK" }, "source": [ "## 深入了解多工作进程训练\n", "\n", "本教程演示了多工作进程设置的自定义训练循环工作流程。有关其他主题的详细描述可在适用于自定义训练循环的[使用 Keras 进行多工作进程训练 (`tf.keras.Model.fit`)](multi_worker_with_keras.ipynb) 教程中找到。" ] }, { "cell_type": "markdown", "metadata": { "id": "ega2hdOQEmy_" }, "source": [ "## 了解更多\n", "\n", "1. [TensorFlow 中的分布式训练](../../guide/distributed_training.ipynb)指南概述了可用的分布式策略。\n", "2. [官方模型](https://github.com/tensorflow/models/tree/master/official),其中许多模型可以配置为运行多个分布式策略。\n", "3. tf.function 指南中的“性能”部分提供了有关其他策略和[工具](../../guide/profiler.md)的信息,您可以使用它们来优化 TensorFlow 模型的性能。\n" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "multi_worker_with_ctl.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }