{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "Tce3stUlHN0L" }, "outputs": [], "source": [ "##### Copyright 2018 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "tuOe1ymfHZPu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "MfBg1C5NB3X0" }, "source": [ "# 使用 TPU\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看在 Google Colab 中运行 在 Github 上查看源代码\n", " 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "Ys81cOhXOWUP" }, "source": [ "本指南演示了如何在[张量处理单元 (TPU)](https://cloud.google.com/tpu/) 和 TPU Pod 上执行基本训练,TPU Pod 是一组通过专用高速网络接口连接的 TPU 设备,带有 `tf.keras` 和自定义训练循环。\n", "\n", "TPU 是 Google 定制开发的专用集成电路 (ASIC),用于加速机器学习工作负载。它们可通过 [Google Colab](https://colab.research.google.com/)、[TPU Research Cloud](https://sites.research.google/trc/) 和 [Cloud TPU](https://cloud.google.com/tpu) 获得。" ] }, { "cell_type": "markdown", "metadata": { "id": "ek5Hop74NVKm" }, "source": [ "## 安装" ] }, { "cell_type": "markdown", "metadata": { "id": "ebf7f8489bb7" }, "source": [ "在运行此 Colab 笔记本之前,请在以下路径下检查笔记本设置,确保硬件加速器为 TPU:**Runtime** > **Change runtime type** > **Hardware accelerator** > **TPU**。\n", "\n", "导入一些必要的库,包括 TensorFlow Datasets:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Cw0WRaChRxTL", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import tensorflow as tf\n", "\n", "import os\n", "import tensorflow_datasets as tfds" ] }, { "cell_type": "markdown", "metadata": { "id": "yDWaRxSpwBN1" }, "source": [ "## TPU 初始化\n", "\n", "与运行用户 Python 程序的本地流程不同,TPU 通常在 [Cloud TPU](https://cloud.google.com/tpu/docs/) 工作进程上。因此,需要完成一些初始化工作才能连接到远程集群并初始化 TPU。请注意,tf.distribute.cluster_resolver.TPUClusterResolver 的 `tpu` 参数是一个仅适用于 Colab 的特殊地址。如果在 Google Compute Engine (GCE) 上运行,应改为传入 Cloud TPU 的名称。" ] }, { "cell_type": "markdown", "metadata": { "id": "dCqWMqvtwOLs" }, "source": [ "注:必须将 TPU 初始化代码放在程序的开头位置。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dKPqF8d1wJCV", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')\n", "tf.config.experimental_connect_to_cluster(resolver)\n", "# This is the TPU initialization code that has to be at the beginning.\n", "tf.tpu.experimental.initialize_tpu_system(resolver)\n", "print(\"All devices: \", tf.config.list_logical_devices('TPU'))" ] }, { "cell_type": "markdown", "metadata": { "id": "Mv7kehTZ1Lq_" }, "source": [ "## 手动设备放置\n", "\n", "初始化 TPU 后,您可以通过手动设备放置将计算放置在单个 TPU 设备上:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "XRZ4kMoxBNND", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])\n", "b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])\n", "\n", "with tf.device('/TPU:0'):\n", " c = tf.matmul(a, b)\n", "\n", "print(\"c device: \", c.device)\n", "print(c)" ] }, { "cell_type": "markdown", "metadata": { "id": "_NJm-kgFO0cC" }, "source": [ "## 分布策略\n", "\n", "通常,您可以在多个 TPU 上以数据并行的方式运行模型。为了在多个 TPU(以及多个 GPU 或多台机器)上分布模型,TensorFlow 提供了 `tf.distribute.Strategy` API。您可以更换分布策略,该模型将在任何给定的 (TPU) 设备上运行。在[使用 TensorFlow 进行分布式训练](./distributed_training.ipynb)指南中了解详情。" ] }, { "cell_type": "markdown", "metadata": { "id": "DcDPMZs-9uLJ" }, "source": [ "使用 `tf.distribute.TPUStrategy` 选项实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算,并将其用于 TPUStrategy。\n", "\n", "要演示这一点,请创建一个 `tf.distribute.TPUStrategy` 对象:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "7SO23K8oRpjI", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "strategy = tf.distribute.TPUStrategy(resolver)" ] }, { "cell_type": "markdown", "metadata": { "id": "JlaAmswWPsU6" }, "source": [ "要复制计算,以便在所有 TPU 核心中运行,可以直接将其传入 `strategy.run` API。在下面的示例中,所有核心都会获得相同的输入 `(a, b)`,并单独在每个核心上执行矩阵乘法运算。输出是所有副本的值。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-90CL5uFPTOa", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "@tf.function\n", "def matmul_fn(x, y):\n", " z = tf.matmul(x, y)\n", " return z\n", "\n", "z = strategy.run(matmul_fn, args=(a, b))\n", "print(z)" ] }, { "cell_type": "markdown", "metadata": { "id": "uxgYl6kGHJLc" }, "source": [ "## TPU 上的分类\n", "\n", "我们已经学习了基本概念,现在来看看具体示例。本部分会演示如何使用分布策略 `tf.distribute.experimental.TPUStrategy` 在 Cloud TPU 上训练 Keras 模型。" ] }, { "cell_type": "markdown", "metadata": { "id": "gKRALGgt_kCo" }, "source": [ "### 定义 Keras 模型\n", "\n", "首先定义 [`Sequential` Keras 模型](https://tensorflow.google.cn/guide/keras/sequential_model),对 MNIST 数据集进行图像分类。这与您在 CPU 或 GPU 上进行训练时使用的定义相同。请注意,Keras 模型创建需要位于 `Strategy.scope` 内,这样才能在每个 TPU 设备上创建变量。代码的其他部分不必放在 `Strategy` 作用域内。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "DiBiN-Z_R7P7", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def create_model():\n", " regularizer = tf.keras.regularizers.L2(1e-5)\n", " return tf.keras.Sequential(\n", " [tf.keras.layers.Conv2D(256, 3, input_shape=(28, 28, 1),\n", " activation='relu',\n", " kernel_regularizer=regularizer),\n", " tf.keras.layers.Conv2D(256, 3,\n", " activation='relu',\n", " kernel_regularizer=regularizer),\n", " tf.keras.layers.Flatten(),\n", " tf.keras.layers.Dense(256,\n", " activation='relu',\n", " kernel_regularizer=regularizer),\n", " tf.keras.layers.Dense(128,\n", " activation='relu',\n", " kernel_regularizer=regularizer),\n", " tf.keras.layers.Dense(10,\n", " kernel_regularizer=regularizer)])" ] }, { "cell_type": "markdown", "metadata": { "id": "h-2qaXgfyONQ" }, "source": [ "此模型将 L2 正则化项放在每层的权重上,以便下面的自定义训练循环可以显示如何从 `Model.losses` 中选取它们。" ] }, { "cell_type": "markdown", "metadata": { "id": "qYOYjYTg_31l" }, "source": [ "### 加载数据集\n", "\n", "使用 Cloud TPU 时,有效使用 `tf.data.Dataset` API 很关键。有关数据集性能的详细信息,请参阅[输入流水线性能指南](./data_performance.ipynb)。\n", "\n", "如果使用的是 [TPU Nodes](https://cloud.google.com/tpu/docs/managing-tpus-tpu-vm),则需要将 TensorFlow `Dataset` 读取的所有数据文件存储在 [Google Cloud Storage (GCS) 存储分区](https://cloud.google.com/tpu/docs/storage-buckets)中。如果使用的是 [TPU VM](https://cloud.google.com/tpu/docs/users-guide-tpu-vm),则可以将数据存储在任意位置。有关 TPU Nodes 和 TPU VM 的更多信息,请参阅 [TPU 系统架构](https://cloud.google.com/tpu/docs/system-architecture-tpu-vm)文档。\n", "\n", "对于大多数用例,建议将数据转换为 `TFRecord` 格式,并使用 `tf.data.TFRecordDataset` 进行读取。有关操作方法的详细信息,请参阅 [TFRecord 和 tf.Example 教程](../tutorials/load_data/tfrecord.ipynb)。不过,这并非硬性要求,如果愿意,您可以使用其他数据集读取器,如 `tf.data.FixedLengthRecordDataset` 或 `tf.data.TextLineDataset`。\n", "\n", "您可以使用 `tf.data.Dataset.cache` 将整个小数据集加载到内存中。\n", "\n", "无论使用哪一种数据格式,我们都强烈建议使用大文件(100MB 左右)。在这种网络化环境下,这一点尤其重要,因为打开文件的开销非常高。\n", "\n", "如下面的代码所示,您应使用 Tensorflow Datasets `tfds.load` 模块获取 MNIST 训练和测试数据的副本。请注意,代码中已指定 `try_gcs` 来使用公共 GCS 存储分区中提供的副本。如果不这样指定,TPU 将无法访问下载的数据。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "noAd416KSCo7", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def get_dataset(batch_size, is_training=True):\n", " split = 'train' if is_training else 'test'\n", " dataset, info = tfds.load(name='mnist', split=split, with_info=True,\n", " as_supervised=True, try_gcs=True)\n", "\n", " # Normalize the input data.\n", " def scale(image, label):\n", " image = tf.cast(image, tf.float32)\n", " image /= 255.0\n", " return image, label\n", "\n", " dataset = dataset.map(scale)\n", "\n", " # Only shuffle and repeat the dataset in training. The advantage of having an\n", " # infinite dataset for training is to avoid the potential last partial batch\n", " # in each epoch, so that you don't need to think about scaling the gradients\n", " # based on the actual batch size.\n", " if is_training:\n", " dataset = dataset.shuffle(10000)\n", " dataset = dataset.repeat()\n", "\n", " dataset = dataset.batch(batch_size)\n", "\n", " return dataset" ] }, { "cell_type": "markdown", "metadata": { "id": "mgUC6A-zCMEr" }, "source": [ "### 使用 Keras 高级 API 训练模型\n", "\n", "可以使用 Keras `Model.fit` 和 `Model.compile` API 训练模型。在此步骤中没有特定于 TPU 的内容,可以像使用多个 GPU 和 `MirroredStrategy` 而不是 `TPUStrategy` 一样编写代码。可以在[使用 Keras 进行分布式训练](../tutorials/distribute/keras.ipynb)教程中了解详情。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ubmDchPqSIx0", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "with strategy.scope():\n", " model = create_model()\n", " model.compile(optimizer='adam',\n", " loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", " metrics=['sparse_categorical_accuracy'])\n", "\n", "batch_size = 200\n", "steps_per_epoch = 60000 // batch_size\n", "validation_steps = 10000 // batch_size\n", "\n", "train_dataset = get_dataset(batch_size, is_training=True)\n", "test_dataset = get_dataset(batch_size, is_training=False)\n", "\n", "model.fit(train_dataset,\n", " epochs=5,\n", " steps_per_epoch=steps_per_epoch,\n", " validation_data=test_dataset,\n", " validation_steps=validation_steps)" ] }, { "cell_type": "markdown", "metadata": { "id": "8hSGBIYtUugJ" }, "source": [ "为了减少 Python 开销,同时最大限度提升 TPU 的性能,请将 `steps_per_execution` 参数传入 Keras `Model.compile`。在本例中,它可以将吞吐量提升约 50%:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "M6e3aVVLUorL", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "with strategy.scope():\n", " model = create_model()\n", " model.compile(optimizer='adam',\n", " # Anything between 2 and `steps_per_epoch` could help here.\n", " steps_per_execution = 50,\n", " loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n", " metrics=['sparse_categorical_accuracy'])\n", "\n", "model.fit(train_dataset,\n", " epochs=5,\n", " steps_per_epoch=steps_per_epoch,\n", " validation_data=test_dataset,\n", " validation_steps=validation_steps)" ] }, { "cell_type": "markdown", "metadata": { "id": "0rRALBZNCO4A" }, "source": [ "### 使用自定义训练循环训练模型\n", "\n", "还可以直接使用 `tf.function` 和 `tf.distribute` API 创建和训练模型。可以使用 `Strategy.experimental_distribute_datasets_from_function` API 通过给定的数据集函数分布 `tf.data.Dataset`。请注意,在下面的示例中,传递给 `Dataset` 的批次大小是每个副本的批次大小,而非全局批次大小。要了解详情,请查阅[使用 `tf.distribute.Strategy` 进行自定义训练](../tutorials/distribute/custom_training.ipynb)教程。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "DxdgXPAL6iFE" }, "source": [ "首先,创建模型、数据集和 `tf.function`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9aHhqwao2Fxi", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Create the model, optimizer and metrics inside the `tf.distribute.Strategy`\n", "# scope, so that the variables can be mirrored on each device.\n", "with strategy.scope():\n", " model = create_model()\n", " optimizer = tf.keras.optimizers.Adam()\n", " training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)\n", " training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(\n", " 'training_accuracy', dtype=tf.float32)\n", "\n", "# Calculate per replica batch size, and distribute the `tf.data.Dataset`s\n", "# on each TPU worker.\n", "per_replica_batch_size = batch_size // strategy.num_replicas_in_sync\n", "\n", "train_dataset = strategy.experimental_distribute_datasets_from_function(\n", " lambda _: get_dataset(per_replica_batch_size, is_training=True))\n", "\n", "@tf.function\n", "def train_step(iterator):\n", " \"\"\"The step function for one training step.\"\"\"\n", "\n", " def step_fn(inputs):\n", " \"\"\"The computation to run on each TPU device.\"\"\"\n", " images, labels = inputs\n", " with tf.GradientTape() as tape:\n", " logits = model(images, training=True)\n", " per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(\n", " labels, logits, from_logits=True)\n", " loss = tf.nn.compute_average_loss(per_example_loss)\n", " model_losses = model.losses\n", " if model_losses:\n", " loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))\n", "\n", " grads = tape.gradient(loss, model.trainable_variables)\n", " optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))\n", " training_loss.update_state(loss * strategy.num_replicas_in_sync)\n", " training_accuracy.update_state(labels, logits)\n", "\n", " strategy.run(step_fn, args=(next(iterator),))" ] }, { "cell_type": "markdown", "metadata": { "id": "Ibi7Z97V6xsQ" }, "source": [ "然后,运行训练循环:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1du5cXWt6Vtw", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "steps_per_eval = 10000 // batch_size\n", "\n", "train_iterator = iter(train_dataset)\n", "for epoch in range(5):\n", " print('Epoch: {}/5'.format(epoch))\n", "\n", " for step in range(steps_per_epoch):\n", " train_step(train_iterator)\n", " print('Current step: {}, training loss: {}, training accuracy: {}%'.format(\n", " optimizer.iterations.numpy(),\n", " round(float(training_loss.result()), 4),\n", " round(float(training_accuracy.result()) * 100, 2)))\n", " training_loss.reset_states()\n", " training_accuracy.reset_states()" ] }, { "cell_type": "markdown", "metadata": { "id": "TnZJUM3qIjKu" }, "source": [ "### 在 `tf.function` 中利用多步法提升性能\n", "\n", "您可以通过在 `tf.function`. 中运行多步以提升性能。在 `tf.function` 内使用 `tf.range` 包装 `Strategy.run` 调用即可实现此目的,在 TPU 工作进程上,AutoGraph 会将其转换为 `tf.while_loop`。可以在使用 `tf.function` 升性能指南中详细了解 `tf.function`。\n", "\n", "在 `tf.function` 中,虽然多步法的性能更高,但是与单步法相比,可谓各有利弊。在 `tf.function` 中运行多个步骤不够灵活,您无法以 Eager 方式运行,也不能运行任意 Python 代码。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "2grYvXLzJYkP", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "@tf.function\n", "def train_multiple_steps(iterator, steps):\n", " \"\"\"The step function for one training step.\"\"\"\n", "\n", " def step_fn(inputs):\n", " \"\"\"The computation to run on each TPU device.\"\"\"\n", " images, labels = inputs\n", " with tf.GradientTape() as tape:\n", " logits = model(images, training=True)\n", " per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(\n", " labels, logits, from_logits=True)\n", " loss = tf.nn.compute_average_loss(per_example_loss)\n", " model_losses = model.losses\n", " if model_losses:\n", " loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))\n", " grads = tape.gradient(loss, model.trainable_variables)\n", " optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))\n", " training_loss.update_state(loss * strategy.num_replicas_in_sync)\n", " training_accuracy.update_state(labels, logits)\n", "\n", " for _ in tf.range(steps):\n", " strategy.run(step_fn, args=(next(iterator),))\n", "\n", "# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get\n", "# retraced if the value changes.\n", "train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))\n", "\n", "print('Current step: {}, training loss: {}, training accuracy: {}%'.format(\n", " optimizer.iterations.numpy(),\n", " round(float(training_loss.result()), 4),\n", " round(float(training_accuracy.result()) * 100, 2)))" ] }, { "cell_type": "markdown", "metadata": { "id": "WBKVhMvWjibf" }, "source": [ "## 后续步骤\n", "\n", "要详细了解 Cloud TPU 以及如何使用它们,请查看以下资源:\n", "\n", "- [Google Cloud TPU](https://cloud.google.com/tpu):Google Cloud TPU 首页。\n", "- [Google Cloud TPU 文档](https://cloud.google.com/tpu/docs/):Google Cloud TPU 文档,其中包括:\n", " - [Cloud TPU 简介](https://cloud.google.com/tpu/docs/intro-to-tpu):使用 Cloud TPU 的概述。\n", " - [Cloud TPU 快速入门](https://cloud.google.com/tpu/docs/quick-starts):使用 TensorFlow 和其他主要机器学习框架利用 Cloud TPU VM 的快速入门简介。\n", "- [Google Cloud TPU Colab 笔记本](https://cloud.google.com/tpu/docs/colabs):端到端训练示例。\n", "- [Google Cloud TPU 性能指南](https://cloud.google.com/tpu/docs/performance-guide):通过为应用调整 Cloud TPU 配置参数来进一步增强 Cloud TPU 性能。\n", "- [Distributed training with TensorFlow](./distributed_training.ipynb): How to use distribution strategies—including `tf.distribute.TPUStrategy`—with examples showing best practices.\n", "- TPU 嵌入向量:TensorFlow 包括通过 `tf.tpu.experimental.embedding` 在 TPU 上训练嵌入向量的专门支持。此外,[TensorFlow Recommenders](https://tensorflow.google.cn/recommenders) 还具有 `tfrs.layers.embedding.TPUEmbedding`。嵌入向量提供高效和密集的表示,捕捉特征之间的复杂相似度和关系。TensorFlow 的 TPU 特定嵌入向量支持允许您训练大于单个 TPU 设备内存的嵌入向量,并在 TPU 上使用稀疏和不规则输入。\n", "- [TPU Research Cloud (TRC)](https://sites.research.google/trc/about/):TRC 让研究人员能够申请访问由超过 1,000 个 Cloud TPU 设备组成的集群。\n" ] } ], "metadata": { "accelerator": "TPU", "colab": { "collapsed_sections": [], "name": "tpu.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }