{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "wJcYs_ERTnnI" }, "outputs": [], "source": [ "##### Copyright 2021 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "HMUDt0CiUJk9", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "77z2OchJTk0l" }, "source": [ "# 迁移多工作进程 CPU/GPU 训练\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 运行 在 Github 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "meUTrR4I6m1C" }, "source": [ "本指南演示了如何将多工作进程分布式训练工作流从 TensorFlow 1 迁移到 TensorFlow 2。\n", "\n", "使用 CPU/GPU 执行多工作进程训练:\n", "\n", "- 在 TensorFlow 1 中,您通常会使用 `tf.estimator.train_and_evaluate` 和 `tf.estimator.Estimator` API。\n", "- 在 TensorFlow 2 中,使用 Keras API 编写模型、损失函数、优化器和指标。随后,利用 Keras `Model.fit` API 或自定义训练循环(使用 `tf.GradientTape`)并通过 `tf.distribute.experimental.ParameterServerStrategy` 或 `tf.distribute.MultiWorkerMirroredStrategy` 将训练分布到多个工作进程之间。有关详情,请参阅下列教程:\n", " - [使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)\n", " - [使用 Keras Model.fit/自定义训练循环进行参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)\n", " - [在 Keras Model.fit 中使用 MultiWorkerMirroredStrategy](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n", " - [在自定义训练循环中使用 MultiWorkerMirroredStrategy](../../tutorials/distribute/multi_worker_with_ctl.ipynb)。" ] }, { "cell_type": "markdown", "metadata": { "id": "YdZSoIXEbhg-" }, "source": [ "## 安装" ] }, { "cell_type": "markdown", "metadata": { "id": "28f46832b54d" }, "source": [ "从一些必要的导入和用于演示目的的简单数据集开始:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iE0vSfMXumKI", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# The notebook uses a dataset instance for `Model.fit` with\n", "# `ParameterServerStrategy`, which depends on symbols in TF 2.7.\n", "# Install a utility needed for this demonstration\n", "!pip install portpicker\n", "\n", "import tensorflow as tf\n", "import tensorflow.compat.v1 as tf1" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "m7rnGxsXtDkV", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "features = [[1., 1.5], [2., 2.5], [3., 3.5]]\n", "labels = [[0.3], [0.5], [0.7]]\n", "eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]\n", "eval_labels = [[0.8], [0.9], [1.]]" ] }, { "cell_type": "markdown", "metadata": { "id": "T2uaw9QaDM_X" }, "source": [ "您将需要 `'TF_CONFIG'` 配置环境变量以在 TensorFlow 中的多台机器上进行训练。使用 `'TF_CONFIG'` 指定 `'cluster'` 和 `'task'` 的地址。(有关详情,请参阅[分布式训练](../...guide/distributed_training.ipynb)指南。)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4OUzwoQgXgkG", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import json\n", "import os\n", "\n", "tf_config = {\n", " 'cluster': {\n", " 'chief': ['localhost:11111'],\n", " 'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],\n", " 'ps': ['localhost:12121', 'localhost:13131'],\n", " },\n", " 'task': {'type': 'chief', 'index': 0}\n", "}\n", "\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "PbeoSbbmDdc0" }, "source": [ "注:遗憾的是,由于在 TensorFlow 1 中使用 `tf.estimator` API 进行多工作进程训练需要多个客户端(在此 Colab 笔记本中实现这一点会特别棘手),这会使笔记本在没有 `'TF_CONFIG'` 环境变量的情况下可运行,因此它会回退为本地训练。(有关详情,请参阅[使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)指南中的设置 `'TF_CONFIG'` 环境变量部分。)\n", "\n", "使用 `del` 语句移除变量(但在 TensorFlow 1 中的实际多工作进程训练中,您不必这样做):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AHuynAR5D8sU", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "del os.environ['TF_CONFIG']" ] }, { "cell_type": "markdown", "metadata": { "id": "4uXff1BEssdE" }, "source": [ "## TensorFlow 1:使用 tf.estimator API 进行多工作进程分布式训练" ] }, { "cell_type": "markdown", "metadata": { "id": "MpyINdiLEN3c" }, "source": [ "以下代码段演示了 TF1 中多工作进程训练的规范工作流:您将使用 `tf.estimator.Estimator`、`tf.estimator.TrainSpec`、`tf.estimator.EvalSpec` 和 `tf.estimator.train_and_evaluate` API 来分布训练:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "lqe9obf7suIj", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def _input_fn():\n", " return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)\n", "\n", "def _eval_input_fn():\n", " return tf1.data.Dataset.from_tensor_slices(\n", " (eval_features, eval_labels)).batch(1)\n", "\n", "def _model_fn(features, labels, mode):\n", " logits = tf1.layers.Dense(1)(features)\n", " loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)\n", " optimizer = tf1.train.AdagradOptimizer(0.05)\n", " train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())\n", " return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)\n", "\n", "estimator = tf1.estimator.Estimator(model_fn=_model_fn)\n", "train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)\n", "eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)\n", "tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)" ] }, { "cell_type": "markdown", "metadata": { "id": "KEmzBjfnsxwT" }, "source": [ "## TensorFlow 2:使用分布策略进行多工作进程训练" ] }, { "cell_type": "markdown", "metadata": { "id": "Syb66qsbEp1x" }, "source": [ "在 TensorFlow 2 中,使用 CPU、GPU 和 TPU 的多个工作进程之间的分布式训练是通过 `tf.distribute.Strategy` 完成的。\n", "\n", "下面的示例演示了如何使用两种这样的策略:`tf.distribute.experimental.ParameterServerStrategy` 和 `tf.distribute.MultiWorkerMirroredStrategy`,这两种策略都是为使用多个工作进程进行 CPU/GPU 训练而设计的。\n", "\n", "`ParameterServerStrategy` 使用了一个*协调器* (`'chief'`),这使其对此 Colab 笔记本中的环境更加友好。您将在此处使用一些效用函数来设置可运行体验所必需的支持元素:您将创建一个*进程内聚簇*,其中线程用于模拟参数服务器 (`'ps'`) 和工作进程 (`'worker'`)。有关参数服务器训练的更多信息,请参阅[使用 ParameterServerStrategy 进行参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)教程。\n", "\n", "在此示例中,首先使用 `tf.distribute.cluster_resolver.TFConfigClusterResolver` 定义 `'TF_CONFIG'` 环境变量以提供聚簇信息。如果您使用聚簇管理系统进行分布式训练,请检查它是否已经提供了 `'TF_CONFIG'`,如果已提供,则无需显式设置此环境变量。(有关详情,请参阅[使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)指南中的设置 `'TF_CONFIG'` 环境变量部分。)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rp-gFY0H5rF-", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Find ports that are available for the `'chief'` (the coordinator),\n", "# `'worker'`s, and `'ps'` (parameter servers).\n", "import portpicker\n", "\n", "chief_port = portpicker.pick_unused_port()\n", "worker_ports = [portpicker.pick_unused_port() for _ in range(3)]\n", "ps_ports = [portpicker.pick_unused_port() for _ in range(2)]\n", "\n", "# Dump the cluster information to `'TF_CONFIG'`.\n", "tf_config = {\n", " 'cluster': {\n", " 'chief': [\"localhost:%s\" % chief_port],\n", " 'worker': [\"localhost:%s\" % port for port in worker_ports],\n", " 'ps': [\"localhost:%s\" % port for port in ps_ports],\n", " },\n", " 'task': {'type': 'chief', 'index': 0}\n", "}\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)\n", "\n", "# Use a cluster resolver to bridge the information to the strategy created below.\n", "cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()" ] }, { "cell_type": "markdown", "metadata": { "id": "o_8uVvJb6dqq" }, "source": [ "然后,为工程进程和参数服务器一一创建 `tf.distribute.Server`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ZJopinmG6b2z", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Workers need some inter_ops threads to work properly.\n", "# This is only needed for this notebook to demo. Real servers\n", "# should not need this.\n", "worker_config = tf.compat.v1.ConfigProto()\n", "worker_config.inter_op_parallelism_threads = 4\n", "\n", "for i in range(3):\n", " tf.distribute.Server(\n", " cluster_resolver.cluster_spec(),\n", " job_name=\"worker\",\n", " task_index=i,\n", " config=worker_config)\n", "\n", "for i in range(2):\n", " tf.distribute.Server(\n", " cluster_resolver.cluster_spec(),\n", " job_name=\"ps\",\n", " task_index=i)" ] }, { "cell_type": "markdown", "metadata": { "id": "IpfCcF0g6Ao8" }, "source": [ "在现实世界的分布式训练中,您将使用多台机器而不是启动协调器上的所有 `tf.distribute.Server`,并且指定为`\"worker\"` 和 `\"ps\"`(参数服务器)的机器将各自运行一个 `tf.distribute.Server`。请参阅[参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)教程中的*现实世界中的聚簇*部分,了解更多详细信息。\n", "\n", "一切准备就绪后,创建 `ParameterServerStrategy` 对象:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "t45iQeBT7Us_", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)" ] }, { "cell_type": "markdown", "metadata": { "id": "diNsps1MGRS6" }, "source": [ "创建策略对象后,定义模型、优化器和其他变量,然后在 `Strategy.scope` API 中调用 Keras `Model.compile` 以分布训练。(如需了解详情,请参阅 `Strategy.scope` API 文档。)\n", "\n", "如果您更喜欢通过定义前向和后向传递来自定义训练,请参阅[参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)教程中的*使用自定义训练循环进行训练*部分,了解更多详细信息。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "atVciNgPs0fw", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "dataset = tf.data.Dataset.from_tensor_slices(\n", " (features, labels)).shuffle(10).repeat().batch(64)\n", "\n", "eval_dataset = tf.data.Dataset.from_tensor_slices(\n", " (eval_features, eval_labels)).repeat().batch(1)\n", "\n", "with strategy.scope():\n", " model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])\n", " optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)\n", " model.compile(optimizer, \"mse\")\n", "\n", "model.fit(dataset, epochs=5, steps_per_epoch=10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "akZ0aaaS1vA9", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "model.evaluate(eval_dataset, steps=10, return_dict=True)" ] }, { "cell_type": "markdown", "metadata": { "id": "pXbS71XmMSoO" }, "source": [ "> **分区器 (`tf.distribute.experimental.partitioners`)**\n", ">\n", "> TensorFlow 2 中的 `ParameterServerStrategy` 支持变量分区,并提供与 TensorFlow 1 相同,但名称不容易混淆的分区器:\n", ">\n", "> - `tf.compat.v1.variable_axis_size_partitioner` -> `tf.distribute.experimental.partitioners.MaxSizePartitioner`:将分片保持在最大大小以下的分区器)。\n", "> - `tf.compat.v1.min_max_variable_partitioner` -> `tf.distribute.experimental.partitioners.MinSizePartitioner`:为每个分片分配最小大小的分区器。\n", "> - `tf.compat.v1.fixed_size_partitioner` -> `tf.distribute.experimental.partitioners.FixedShardsPartitioner`:分配固定数量分片的分区器。" ] }, { "cell_type": "markdown", "metadata": { "id": "Ig0-uCUbGprd" }, "source": [ "或者,您也可以使用 `MultiWorkerMirroredStrategy` 对象:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xHXP8bOBGtXL", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.\n", "del os.environ['TF_CONFIG']\n", "strategy = tf.distribute.MultiWorkerMirroredStrategy()" ] }, { "cell_type": "markdown", "metadata": { "id": "tOsmqefTGwUf" }, "source": [ "可以将上面使用的策略替换为 `MultiWorkerMirroredStrategy` 对象,以使用此策略执行训练。\n", "\n", "与 `tf.estimator` API 一样,由于 `MultiWorkerMirroredStrategy` 是一种多客户端策略,在此 Colab 笔记本中运行分布式训练并不容易。因此,用这种策略替换上面的代码最终会以在本地运行结束。[使用 Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)/[自定义训练循环](../../tutorials/distribute/multi_worker_with_ctl.ipynb)的多工作进程训练教程演示了在设置 `'TF_CONFIG'` 变量的情况下,如何在 Colab 的本地主机上使用两个工作进程运行多工作进程训练。在实践中,您将在外部 IP 地址/端口上创建多个工作进程,并使用 `'TF_CONFIG'` 变量为每个工作进程指定聚簇配置。" ] }, { "cell_type": "markdown", "metadata": { "id": "917ef6135660" }, "source": [ "## 后续步骤" ] }, { "cell_type": "markdown", "metadata": { "id": "e76fd9d5c98c" }, "source": [ "要详细了解如何在 TensorFlow 2 中使用 `tf.distribute.experimental.ParameterServerStrategy` 和 `tf.distribute.MultiWorkerMirroredStrategy` 进行多工作进程分布式训练,请查看以下资源:\n", "\n", "- 教程:[使用 ParameterServerStrategy 和 Keras Model.fit/自定义训练循环进行参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)\n", "- 教程:[使用 MultiWorkerMirroredStrategy 和 Keras Model.fit 进行多工作进程训练](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n", "- 教程:[使用 MultiWorkerMirroredStrategy 和自定义训练循环进行多工作进程训练](../../tutorials/distribute/multi_worker_with_ctl.ipynb)\n", "- 指南:[使用 TensorFlow 进行分布式训练](../../guide/distributed_training.ipynb)\n", "- 指南:[使用 TensorFlow Profiler 优化 TensorFlow GPU 性能](../../guide/gpu_performance_analysis.ipynb)\n", "- 指南:[使用 GPU](../../guide/gpu.ipynb)(“使用多个 GPU”部分)" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "multi_worker_cpu_gpu_training.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }