{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "Tce3stUlHN0L" }, "outputs": [], "source": [ "##### Copyright 2018 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "tuOe1ymfHZPu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "MfBg1C5NB3X0" }, "source": [ "# 使用 TPU\n", "\n", "
![]() | \n",
" ![]() | \n",
" ![]() | \n",
" ![]() | \n",
"
tf.distribute.cluster_resolver.TPUClusterResolver
的 `tpu` 参数是一个仅适用于 Colab 的特殊地址。如果在 Google Compute Engine (GCE) 上运行,应改为传入 Cloud TPU 的名称。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "dCqWMqvtwOLs"
},
"source": [
"注:必须将 TPU 初始化代码放在程序的开头位置。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "dKPqF8d1wJCV",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')\n",
"tf.config.experimental_connect_to_cluster(resolver)\n",
"# This is the TPU initialization code that has to be at the beginning.\n",
"tf.tpu.experimental.initialize_tpu_system(resolver)\n",
"print(\"All devices: \", tf.config.list_logical_devices('TPU'))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Mv7kehTZ1Lq_"
},
"source": [
"## 手动设备放置\n",
"\n",
"初始化 TPU 后,您可以通过手动设备放置将计算放置在单个 TPU 设备上:\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "XRZ4kMoxBNND",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])\n",
"b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])\n",
"\n",
"with tf.device('/TPU:0'):\n",
" c = tf.matmul(a, b)\n",
"\n",
"print(\"c device: \", c.device)\n",
"print(c)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_NJm-kgFO0cC"
},
"source": [
"## 分布策略\n",
"\n",
"通常,您可以在多个 TPU 上以数据并行的方式运行模型。为了在多个 TPU(以及多个 GPU 或多台机器)上分布模型,TensorFlow 提供了 `tf.distribute.Strategy` API。您可以更换分布策略,该模型将在任何给定的 (TPU) 设备上运行。在[使用 TensorFlow 进行分布式训练](./distributed_training.ipynb)指南中了解详情。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "DcDPMZs-9uLJ"
},
"source": [
"使用 `tf.distribute.TPUStrategy` 选项实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算,并将其用于 TPUStrategy
。\n",
"\n",
"要演示这一点,请创建一个 `tf.distribute.TPUStrategy` 对象:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "7SO23K8oRpjI",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"strategy = tf.distribute.TPUStrategy(resolver)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "JlaAmswWPsU6"
},
"source": [
"要复制计算,以便在所有 TPU 核心中运行,可以直接将其传入 `strategy.run` API。在下面的示例中,所有核心都会获得相同的输入 `(a, b)`,并单独在每个核心上执行矩阵乘法运算。输出是所有副本的值。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "-90CL5uFPTOa",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"@tf.function\n",
"def matmul_fn(x, y):\n",
" z = tf.matmul(x, y)\n",
" return z\n",
"\n",
"z = strategy.run(matmul_fn, args=(a, b))\n",
"print(z)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "uxgYl6kGHJLc"
},
"source": [
"## TPU 上的分类\n",
"\n",
"我们已经学习了基本概念,现在来看看具体示例。本部分会演示如何使用分布策略 `tf.distribute.experimental.TPUStrategy` 在 Cloud TPU 上训练 Keras 模型。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gKRALGgt_kCo"
},
"source": [
"### 定义 Keras 模型\n",
"\n",
"首先定义 [`Sequential` Keras 模型](https://tensorflow.google.cn/guide/keras/sequential_model),对 MNIST 数据集进行图像分类。这与您在 CPU 或 GPU 上进行训练时使用的定义相同。请注意,Keras 模型创建需要位于 `Strategy.scope` 内,这样才能在每个 TPU 设备上创建变量。代码的其他部分不必放在 `Strategy` 作用域内。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "DiBiN-Z_R7P7",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"def create_model():\n",
" regularizer = tf.keras.regularizers.L2(1e-5)\n",
" return tf.keras.Sequential(\n",
" [tf.keras.layers.Conv2D(256, 3, input_shape=(28, 28, 1),\n",
" activation='relu',\n",
" kernel_regularizer=regularizer),\n",
" tf.keras.layers.Conv2D(256, 3,\n",
" activation='relu',\n",
" kernel_regularizer=regularizer),\n",
" tf.keras.layers.Flatten(),\n",
" tf.keras.layers.Dense(256,\n",
" activation='relu',\n",
" kernel_regularizer=regularizer),\n",
" tf.keras.layers.Dense(128,\n",
" activation='relu',\n",
" kernel_regularizer=regularizer),\n",
" tf.keras.layers.Dense(10,\n",
" kernel_regularizer=regularizer)])"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "h-2qaXgfyONQ"
},
"source": [
"此模型将 L2 正则化项放在每层的权重上,以便下面的自定义训练循环可以显示如何从 `Model.losses` 中选取它们。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qYOYjYTg_31l"
},
"source": [
"### 加载数据集\n",
"\n",
"使用 Cloud TPU 时,有效使用 `tf.data.Dataset` API 很关键。有关数据集性能的详细信息,请参阅[输入流水线性能指南](./data_performance.ipynb)。\n",
"\n",
"如果使用的是 [TPU Nodes](https://cloud.google.com/tpu/docs/managing-tpus-tpu-vm),则需要将 TensorFlow `Dataset` 读取的所有数据文件存储在 [Google Cloud Storage (GCS) 存储分区](https://cloud.google.com/tpu/docs/storage-buckets)中。如果使用的是 [TPU VM](https://cloud.google.com/tpu/docs/users-guide-tpu-vm),则可以将数据存储在任意位置。有关 TPU Nodes 和 TPU VM 的更多信息,请参阅 [TPU 系统架构](https://cloud.google.com/tpu/docs/system-architecture-tpu-vm)文档。\n",
"\n",
"对于大多数用例,建议将数据转换为 `TFRecord` 格式,并使用 `tf.data.TFRecordDataset` 进行读取。有关操作方法的详细信息,请参阅 [TFRecord 和 tf.Example 教程](../tutorials/load_data/tfrecord.ipynb)。不过,这并非硬性要求,如果愿意,您可以使用其他数据集读取器,如 `tf.data.FixedLengthRecordDataset` 或 `tf.data.TextLineDataset`。\n",
"\n",
"您可以使用 `tf.data.Dataset.cache` 将整个小数据集加载到内存中。\n",
"\n",
"无论使用哪一种数据格式,我们都强烈建议使用大文件(100MB 左右)。在这种网络化环境下,这一点尤其重要,因为打开文件的开销非常高。\n",
"\n",
"如下面的代码所示,您应使用 Tensorflow Datasets `tfds.load` 模块获取 MNIST 训练和测试数据的副本。请注意,代码中已指定 `try_gcs` 来使用公共 GCS 存储分区中提供的副本。如果不这样指定,TPU 将无法访问下载的数据。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "noAd416KSCo7",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"def get_dataset(batch_size, is_training=True):\n",
" split = 'train' if is_training else 'test'\n",
" dataset, info = tfds.load(name='mnist', split=split, with_info=True,\n",
" as_supervised=True, try_gcs=True)\n",
"\n",
" # Normalize the input data.\n",
" def scale(image, label):\n",
" image = tf.cast(image, tf.float32)\n",
" image /= 255.0\n",
" return image, label\n",
"\n",
" dataset = dataset.map(scale)\n",
"\n",
" # Only shuffle and repeat the dataset in training. The advantage of having an\n",
" # infinite dataset for training is to avoid the potential last partial batch\n",
" # in each epoch, so that you don't need to think about scaling the gradients\n",
" # based on the actual batch size.\n",
" if is_training:\n",
" dataset = dataset.shuffle(10000)\n",
" dataset = dataset.repeat()\n",
"\n",
" dataset = dataset.batch(batch_size)\n",
"\n",
" return dataset"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "mgUC6A-zCMEr"
},
"source": [
"### 使用 Keras 高级 API 训练模型\n",
"\n",
"可以使用 Keras `Model.fit` 和 `Model.compile` API 训练模型。在此步骤中没有特定于 TPU 的内容,可以像使用多个 GPU 和 `MirroredStrategy` 而不是 `TPUStrategy` 一样编写代码。可以在[使用 Keras 进行分布式训练](../tutorials/distribute/keras.ipynb)教程中了解详情。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ubmDchPqSIx0",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"with strategy.scope():\n",
" model = create_model()\n",
" model.compile(optimizer='adam',\n",
" loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n",
" metrics=['sparse_categorical_accuracy'])\n",
"\n",
"batch_size = 200\n",
"steps_per_epoch = 60000 // batch_size\n",
"validation_steps = 10000 // batch_size\n",
"\n",
"train_dataset = get_dataset(batch_size, is_training=True)\n",
"test_dataset = get_dataset(batch_size, is_training=False)\n",
"\n",
"model.fit(train_dataset,\n",
" epochs=5,\n",
" steps_per_epoch=steps_per_epoch,\n",
" validation_data=test_dataset,\n",
" validation_steps=validation_steps)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "8hSGBIYtUugJ"
},
"source": [
"为了减少 Python 开销,同时最大限度提升 TPU 的性能,请将 `steps_per_execution` 参数传入 Keras `Model.compile`。在本例中,它可以将吞吐量提升约 50%:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "M6e3aVVLUorL",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"with strategy.scope():\n",
" model = create_model()\n",
" model.compile(optimizer='adam',\n",
" # Anything between 2 and `steps_per_epoch` could help here.\n",
" steps_per_execution = 50,\n",
" loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n",
" metrics=['sparse_categorical_accuracy'])\n",
"\n",
"model.fit(train_dataset,\n",
" epochs=5,\n",
" steps_per_epoch=steps_per_epoch,\n",
" validation_data=test_dataset,\n",
" validation_steps=validation_steps)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0rRALBZNCO4A"
},
"source": [
"### 使用自定义训练循环训练模型\n",
"\n",
"还可以直接使用 `tf.function` 和 `tf.distribute` API 创建和训练模型。可以使用 `Strategy.experimental_distribute_datasets_from_function` API 通过给定的数据集函数分布 `tf.data.Dataset`。请注意,在下面的示例中,传递给 `Dataset` 的批次大小是每个副本的批次大小,而非全局批次大小。要了解详情,请查阅[使用 `tf.distribute.Strategy` 进行自定义训练](../tutorials/distribute/custom_training.ipynb)教程。\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "DxdgXPAL6iFE"
},
"source": [
"首先,创建模型、数据集和 `tf.function`:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "9aHhqwao2Fxi",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"# Create the model, optimizer and metrics inside the `tf.distribute.Strategy`\n",
"# scope, so that the variables can be mirrored on each device.\n",
"with strategy.scope():\n",
" model = create_model()\n",
" optimizer = tf.keras.optimizers.Adam()\n",
" training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)\n",
" training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(\n",
" 'training_accuracy', dtype=tf.float32)\n",
"\n",
"# Calculate per replica batch size, and distribute the `tf.data.Dataset`s\n",
"# on each TPU worker.\n",
"per_replica_batch_size = batch_size // strategy.num_replicas_in_sync\n",
"\n",
"train_dataset = strategy.experimental_distribute_datasets_from_function(\n",
" lambda _: get_dataset(per_replica_batch_size, is_training=True))\n",
"\n",
"@tf.function\n",
"def train_step(iterator):\n",
" \"\"\"The step function for one training step.\"\"\"\n",
"\n",
" def step_fn(inputs):\n",
" \"\"\"The computation to run on each TPU device.\"\"\"\n",
" images, labels = inputs\n",
" with tf.GradientTape() as tape:\n",
" logits = model(images, training=True)\n",
" per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(\n",
" labels, logits, from_logits=True)\n",
" loss = tf.nn.compute_average_loss(per_example_loss)\n",
" model_losses = model.losses\n",
" if model_losses:\n",
" loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))\n",
"\n",
" grads = tape.gradient(loss, model.trainable_variables)\n",
" optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))\n",
" training_loss.update_state(loss * strategy.num_replicas_in_sync)\n",
" training_accuracy.update_state(labels, logits)\n",
"\n",
" strategy.run(step_fn, args=(next(iterator),))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Ibi7Z97V6xsQ"
},
"source": [
"然后,运行训练循环:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "1du5cXWt6Vtw",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"steps_per_eval = 10000 // batch_size\n",
"\n",
"train_iterator = iter(train_dataset)\n",
"for epoch in range(5):\n",
" print('Epoch: {}/5'.format(epoch))\n",
"\n",
" for step in range(steps_per_epoch):\n",
" train_step(train_iterator)\n",
" print('Current step: {}, training loss: {}, training accuracy: {}%'.format(\n",
" optimizer.iterations.numpy(),\n",
" round(float(training_loss.result()), 4),\n",
" round(float(training_accuracy.result()) * 100, 2)))\n",
" training_loss.reset_states()\n",
" training_accuracy.reset_states()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "TnZJUM3qIjKu"
},
"source": [
"### 在 `tf.function` 中利用多步法提升性能\n",
"\n",
"您可以通过在 `tf.function`. 中运行多步以提升性能。在 `tf.function` 内使用 `tf.range` 包装 `Strategy.run` 调用即可实现此目的,在 TPU 工作进程上,AutoGraph 会将其转换为 `tf.while_loop`。可以在使用 `tf.function` 升性能指南中详细了解 `tf.function`。\n",
"\n",
"在 `tf.function` 中,虽然多步法的性能更高,但是与单步法相比,可谓各有利弊。在 `tf.function` 中运行多个步骤不够灵活,您无法以 Eager 方式运行,也不能运行任意 Python 代码。\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "2grYvXLzJYkP",
"vscode": {
"languageId": "python"
}
},
"outputs": [],
"source": [
"@tf.function\n",
"def train_multiple_steps(iterator, steps):\n",
" \"\"\"The step function for one training step.\"\"\"\n",
"\n",
" def step_fn(inputs):\n",
" \"\"\"The computation to run on each TPU device.\"\"\"\n",
" images, labels = inputs\n",
" with tf.GradientTape() as tape:\n",
" logits = model(images, training=True)\n",
" per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(\n",
" labels, logits, from_logits=True)\n",
" loss = tf.nn.compute_average_loss(per_example_loss)\n",
" model_losses = model.losses\n",
" if model_losses:\n",
" loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))\n",
" grads = tape.gradient(loss, model.trainable_variables)\n",
" optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))\n",
" training_loss.update_state(loss * strategy.num_replicas_in_sync)\n",
" training_accuracy.update_state(labels, logits)\n",
"\n",
" for _ in tf.range(steps):\n",
" strategy.run(step_fn, args=(next(iterator),))\n",
"\n",
"# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get\n",
"# retraced if the value changes.\n",
"train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))\n",
"\n",
"print('Current step: {}, training loss: {}, training accuracy: {}%'.format(\n",
" optimizer.iterations.numpy(),\n",
" round(float(training_loss.result()), 4),\n",
" round(float(training_accuracy.result()) * 100, 2)))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "WBKVhMvWjibf"
},
"source": [
"## 后续步骤\n",
"\n",
"要详细了解 Cloud TPU 以及如何使用它们,请查看以下资源:\n",
"\n",
"- [Google Cloud TPU](https://cloud.google.com/tpu):Google Cloud TPU 首页。\n",
"- [Google Cloud TPU 文档](https://cloud.google.com/tpu/docs/):Google Cloud TPU 文档,其中包括:\n",
" - [Cloud TPU 简介](https://cloud.google.com/tpu/docs/intro-to-tpu):使用 Cloud TPU 的概述。\n",
" - [Cloud TPU 快速入门](https://cloud.google.com/tpu/docs/quick-starts):使用 TensorFlow 和其他主要机器学习框架利用 Cloud TPU VM 的快速入门简介。\n",
"- [Google Cloud TPU Colab 笔记本](https://cloud.google.com/tpu/docs/colabs):端到端训练示例。\n",
"- [Google Cloud TPU 性能指南](https://cloud.google.com/tpu/docs/performance-guide):通过为应用调整 Cloud TPU 配置参数来进一步增强 Cloud TPU 性能。\n",
"- [Distributed training with TensorFlow](./distributed_training.ipynb): How to use distribution strategies—including `tf.distribute.TPUStrategy`—with examples showing best practices.\n",
"- TPU 嵌入向量:TensorFlow 包括通过 `tf.tpu.experimental.embedding` 在 TPU 上训练嵌入向量的专门支持。此外,[TensorFlow Recommenders](https://tensorflow.google.cn/recommenders) 还具有 `tfrs.layers.embedding.TPUEmbedding`。嵌入向量提供高效和密集的表示,捕捉特征之间的复杂相似度和关系。TensorFlow 的 TPU 特定嵌入向量支持允许您训练大于单个 TPU 设备内存的嵌入向量,并在 TPU 上使用稀疏和不规则输入。\n",
"- [TPU Research Cloud (TRC)](https://sites.research.google/trc/about/):TRC 让研究人员能够申请访问由超过 1,000 个 Cloud TPU 设备组成的集群。\n"
]
}
],
"metadata": {
"accelerator": "TPU",
"colab": {
"collapsed_sections": [],
"name": "tpu.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 0
}