##### Copyright 2018 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

使用 TPU#

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 Github 上查看源代码 下载笔记本

本指南演示了如何在张量处理单元 (TPU) 和 TPU Pod 上执行基本训练,TPU Pod 是一组通过专用高速网络接口连接的 TPU 设备,带有 tf.keras 和自定义训练循环。

TPU 是 Google 定制开发的专用集成电路 (ASIC),用于加速机器学习工作负载。它们可通过 Google ColabTPU Research CloudCloud TPU 获得。

安装#

在运行此 Colab 笔记本之前,请在以下路径下检查笔记本设置,确保硬件加速器为 TPU:Runtime > Change runtime type > Hardware accelerator > TPU

导入一些必要的库,包括 TensorFlow Datasets:

import tensorflow as tf

import os
import tensorflow_datasets as tfds

TPU 初始化#

与运行用户 Python 程序的本地流程不同,TPU 通常在 Cloud TPU 工作进程上。因此,需要完成一些初始化工作才能连接到远程集群并初始化 TPU。请注意,tf.distribute.cluster_resolver.TPUClusterResolvertpu 参数是一个仅适用于 Colab 的特殊地址。如果在 Google Compute Engine (GCE) 上运行,应改为传入 Cloud TPU 的名称。

注:必须将 TPU 初始化代码放在程序的开头位置。

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))

手动设备放置#

初始化 TPU 后,您可以通过手动设备放置将计算放置在单个 TPU 设备上:

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

with tf.device('/TPU:0'):
  c = tf.matmul(a, b)

print("c device: ", c.device)
print(c)

分布策略#

通常,您可以在多个 TPU 上以数据并行的方式运行模型。为了在多个 TPU(以及多个 GPU 或多台机器)上分布模型,TensorFlow 提供了 tf.distribute.Strategy API。您可以更换分布策略,该模型将在任何给定的 (TPU) 设备上运行。在使用 TensorFlow 进行分布式训练指南中了解详情。

使用 tf.distribute.TPUStrategy 选项实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算,并将其用于 TPUStrategy

要演示这一点,请创建一个 tf.distribute.TPUStrategy 对象:

strategy = tf.distribute.TPUStrategy(resolver)

要复制计算,以便在所有 TPU 核心中运行,可以直接将其传入 strategy.run API。在下面的示例中,所有核心都会获得相同的输入 (a, b),并单独在每个核心上执行矩阵乘法运算。输出是所有副本的值。

@tf.function
def matmul_fn(x, y):
  z = tf.matmul(x, y)
  return z

z = strategy.run(matmul_fn, args=(a, b))
print(z)

TPU 上的分类#

我们已经学习了基本概念,现在来看看具体示例。本部分会演示如何使用分布策略 tf.distribute.experimental.TPUStrategy 在 Cloud TPU 上训练 Keras 模型。

定义 Keras 模型#

首先定义 Sequential Keras 模型,对 MNIST 数据集进行图像分类。这与您在 CPU 或 GPU 上进行训练时使用的定义相同。请注意,Keras 模型创建需要位于 Strategy.scope 内,这样才能在每个 TPU 设备上创建变量。代码的其他部分不必放在 Strategy 作用域内。

def create_model():
  regularizer = tf.keras.regularizers.L2(1e-5)
  return tf.keras.Sequential(
      [tf.keras.layers.Conv2D(256, 3, input_shape=(28, 28, 1),
                              activation='relu',
                              kernel_regularizer=regularizer),
       tf.keras.layers.Conv2D(256, 3,
                              activation='relu',
                              kernel_regularizer=regularizer),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dense(256,
                             activation='relu',
                             kernel_regularizer=regularizer),
       tf.keras.layers.Dense(128,
                             activation='relu',
                             kernel_regularizer=regularizer),
       tf.keras.layers.Dense(10,
                             kernel_regularizer=regularizer)])

此模型将 L2 正则化项放在每层的权重上,以便下面的自定义训练循环可以显示如何从 Model.losses 中选取它们。

加载数据集#

使用 Cloud TPU 时,有效使用 tf.data.Dataset API 很关键。有关数据集性能的详细信息,请参阅输入流水线性能指南

如果使用的是 TPU Nodes,则需要将 TensorFlow Dataset 读取的所有数据文件存储在 Google Cloud Storage (GCS) 存储分区中。如果使用的是 TPU VM,则可以将数据存储在任意位置。有关 TPU Nodes 和 TPU VM 的更多信息,请参阅 TPU 系统架构文档。

对于大多数用例,建议将数据转换为 TFRecord 格式,并使用 tf.data.TFRecordDataset 进行读取。有关操作方法的详细信息,请参阅 TFRecord 和 tf.Example 教程。不过,这并非硬性要求,如果愿意,您可以使用其他数据集读取器,如 tf.data.FixedLengthRecordDatasettf.data.TextLineDataset

您可以使用 tf.data.Dataset.cache 将整个小数据集加载到内存中。

无论使用哪一种数据格式,我们都强烈建议使用大文件(100MB 左右)。在这种网络化环境下,这一点尤其重要,因为打开文件的开销非常高。

如下面的代码所示,您应使用 Tensorflow Datasets tfds.load 模块获取 MNIST 训练和测试数据的副本。请注意,代码中已指定 try_gcs 来使用公共 GCS 存储分区中提供的副本。如果不这样指定,TPU 将无法访问下载的数据。

def get_dataset(batch_size, is_training=True):
  split = 'train' if is_training else 'test'
  dataset, info = tfds.load(name='mnist', split=split, with_info=True,
                            as_supervised=True, try_gcs=True)

  # Normalize the input data.
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label

  dataset = dataset.map(scale)

  # Only shuffle and repeat the dataset in training. The advantage of having an
  # infinite dataset for training is to avoid the potential last partial batch
  # in each epoch, so that you don't need to think about scaling the gradients
  # based on the actual batch size.
  if is_training:
    dataset = dataset.shuffle(10000)
    dataset = dataset.repeat()

  dataset = dataset.batch(batch_size)

  return dataset

使用 Keras 高级 API 训练模型#

可以使用 Keras Model.fitModel.compile API 训练模型。在此步骤中没有特定于 TPU 的内容,可以像使用多个 GPU 和 MirroredStrategy 而不是 TPUStrategy 一样编写代码。可以在使用 Keras 进行分布式训练教程中了解详情。

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size

train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)

为了减少 Python 开销,同时最大限度提升 TPU 的性能,请将 steps_per_execution 参数传入 Keras Model.compile。在本例中,它可以将吞吐量提升约 50%:

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                # Anything between 2 and `steps_per_epoch` could help here.
                steps_per_execution = 50,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)

使用自定义训练循环训练模型#

还可以直接使用 tf.functiontf.distribute API 创建和训练模型。可以使用 Strategy.experimental_distribute_datasets_from_function API 通过给定的数据集函数分布 tf.data.Dataset。请注意,在下面的示例中,传递给 Dataset 的批次大小是每个副本的批次大小,而非全局批次大小。要了解详情,请查阅使用 tf.distribute.Strategy 进行自定义训练教程。

首先,创建模型、数据集和 tf.function

# Create the model, optimizer and metrics inside the `tf.distribute.Strategy`
# scope, so that the variables can be mirrored on each device.
with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()
  training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
  training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      'training_accuracy', dtype=tf.float32)

# Calculate per replica batch size, and distribute the `tf.data.Dataset`s
# on each TPU worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync

train_dataset = strategy.experimental_distribute_datasets_from_function(
    lambda _: get_dataset(per_replica_batch_size, is_training=True))

@tf.function
def train_step(iterator):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(per_example_loss)
      model_losses = model.losses
      if model_losses:
        loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  strategy.run(step_fn, args=(next(iterator),))

然后,运行训练循环:

steps_per_eval = 10000 // batch_size

train_iterator = iter(train_dataset)
for epoch in range(5):
  print('Epoch: {}/5'.format(epoch))

  for step in range(steps_per_epoch):
    train_step(train_iterator)
  print('Current step: {}, training loss: {}, training accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
  training_loss.reset_states()
  training_accuracy.reset_states()

tf.function 中利用多步法提升性能#

您可以通过在 tf.function. 中运行多步以提升性能。在 tf.function 内使用 tf.range 包装 Strategy.run 调用即可实现此目的,在 TPU 工作进程上,AutoGraph 会将其转换为 tf.while_loop。可以在使用 tf.function 升性能指南中详细了解 tf.function

tf.function 中,虽然多步法的性能更高,但是与单步法相比,可谓各有利弊。在 tf.function 中运行多个步骤不够灵活,您无法以 Eager 方式运行,也不能运行任意 Python 代码。

@tf.function
def train_multiple_steps(iterator, steps):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(per_example_loss)
      model_losses = model.losses
      if model_losses:
        loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  for _ in tf.range(steps):
    strategy.run(step_fn, args=(next(iterator),))

# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))

print('Current step: {}, training loss: {}, training accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))

后续步骤#

要详细了解 Cloud TPU 以及如何使用它们,请查看以下资源:

  • Google Cloud TPU:Google Cloud TPU 首页。

  • Google Cloud TPU 文档:Google Cloud TPU 文档,其中包括:

  • Google Cloud TPU Colab 笔记本:端到端训练示例。

  • Google Cloud TPU 性能指南:通过为应用调整 Cloud TPU 配置参数来进一步增强 Cloud TPU 性能。

  • Distributed training with TensorFlow: How to use distribution strategies—including tf.distribute.TPUStrategy—with examples showing best practices.

  • TPU 嵌入向量:TensorFlow 包括通过 tf.tpu.experimental.embedding 在 TPU 上训练嵌入向量的专门支持。此外,TensorFlow Recommenders 还具有 tfrs.layers.embedding.TPUEmbedding。嵌入向量提供高效和密集的表示,捕捉特征之间的复杂相似度和关系。TensorFlow 的 TPU 特定嵌入向量支持允许您训练大于单个 TPU 设备内存的嵌入向量,并在 TPU 上使用稀疏和不规则输入。

  • TPU Research Cloud (TRC):TRC 让研究人员能够申请访问由超过 1,000 个 Cloud TPU 设备组成的集群。