##### Copyright 2018 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
使用 TPU#
在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 Github 上查看源代码 | 下载笔记本 |
本指南演示了如何在张量处理单元 (TPU) 和 TPU Pod 上执行基本训练,TPU Pod 是一组通过专用高速网络接口连接的 TPU 设备,带有 tf.keras
和自定义训练循环。
TPU 是 Google 定制开发的专用集成电路 (ASIC),用于加速机器学习工作负载。它们可通过 Google Colab、TPU Research Cloud 和 Cloud TPU 获得。
安装#
在运行此 Colab 笔记本之前,请在以下路径下检查笔记本设置,确保硬件加速器为 TPU:Runtime > Change runtime type > Hardware accelerator > TPU。
导入一些必要的库,包括 TensorFlow Datasets:
import tensorflow as tf
import os
import tensorflow_datasets as tfds
TPU 初始化#
与运行用户 Python 程序的本地流程不同,TPU 通常在 Cloud TPU 工作进程上。因此,需要完成一些初始化工作才能连接到远程集群并初始化 TPU。请注意,tf.distribute.cluster_resolver.TPUClusterResolver
的 tpu
参数是一个仅适用于 Colab 的特殊地址。如果在 Google Compute Engine (GCE) 上运行,应改为传入 Cloud TPU 的名称。
注:必须将 TPU 初始化代码放在程序的开头位置。
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
手动设备放置#
初始化 TPU 后,您可以通过手动设备放置将计算放置在单个 TPU 设备上:
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
分布策略#
通常,您可以在多个 TPU 上以数据并行的方式运行模型。为了在多个 TPU(以及多个 GPU 或多台机器)上分布模型,TensorFlow 提供了 tf.distribute.Strategy
API。您可以更换分布策略,该模型将在任何给定的 (TPU) 设备上运行。在使用 TensorFlow 进行分布式训练指南中了解详情。
使用 tf.distribute.TPUStrategy
选项实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算,并将其用于 TPUStrategy
。
要演示这一点,请创建一个 tf.distribute.TPUStrategy
对象:
strategy = tf.distribute.TPUStrategy(resolver)
要复制计算,以便在所有 TPU 核心中运行,可以直接将其传入 strategy.run
API。在下面的示例中,所有核心都会获得相同的输入 (a, b)
,并单独在每个核心上执行矩阵乘法运算。输出是所有副本的值。
@tf.function
def matmul_fn(x, y):
z = tf.matmul(x, y)
return z
z = strategy.run(matmul_fn, args=(a, b))
print(z)
TPU 上的分类#
我们已经学习了基本概念,现在来看看具体示例。本部分会演示如何使用分布策略 tf.distribute.experimental.TPUStrategy
在 Cloud TPU 上训练 Keras 模型。
定义 Keras 模型#
首先定义 Sequential
Keras 模型,对 MNIST 数据集进行图像分类。这与您在 CPU 或 GPU 上进行训练时使用的定义相同。请注意,Keras 模型创建需要位于 Strategy.scope
内,这样才能在每个 TPU 设备上创建变量。代码的其他部分不必放在 Strategy
作用域内。
def create_model():
regularizer = tf.keras.regularizers.L2(1e-5)
return tf.keras.Sequential(
[tf.keras.layers.Conv2D(256, 3, input_shape=(28, 28, 1),
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Conv2D(256, 3,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(256,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Dense(128,
activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Dense(10,
kernel_regularizer=regularizer)])
此模型将 L2 正则化项放在每层的权重上,以便下面的自定义训练循环可以显示如何从 Model.losses
中选取它们。
加载数据集#
使用 Cloud TPU 时,有效使用 tf.data.Dataset
API 很关键。有关数据集性能的详细信息,请参阅输入流水线性能指南。
如果使用的是 TPU Nodes,则需要将 TensorFlow Dataset
读取的所有数据文件存储在 Google Cloud Storage (GCS) 存储分区中。如果使用的是 TPU VM,则可以将数据存储在任意位置。有关 TPU Nodes 和 TPU VM 的更多信息,请参阅 TPU 系统架构文档。
对于大多数用例,建议将数据转换为 TFRecord
格式,并使用 tf.data.TFRecordDataset
进行读取。有关操作方法的详细信息,请参阅 TFRecord 和 tf.Example 教程。不过,这并非硬性要求,如果愿意,您可以使用其他数据集读取器,如 tf.data.FixedLengthRecordDataset
或 tf.data.TextLineDataset
。
您可以使用 tf.data.Dataset.cache
将整个小数据集加载到内存中。
无论使用哪一种数据格式,我们都强烈建议使用大文件(100MB 左右)。在这种网络化环境下,这一点尤其重要,因为打开文件的开销非常高。
如下面的代码所示,您应使用 Tensorflow Datasets tfds.load
模块获取 MNIST 训练和测试数据的副本。请注意,代码中已指定 try_gcs
来使用公共 GCS 存储分区中提供的副本。如果不这样指定,TPU 将无法访问下载的数据。
def get_dataset(batch_size, is_training=True):
split = 'train' if is_training else 'test'
dataset, info = tfds.load(name='mnist', split=split, with_info=True,
as_supervised=True, try_gcs=True)
# Normalize the input data.
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255.0
return image, label
dataset = dataset.map(scale)
# Only shuffle and repeat the dataset in training. The advantage of having an
# infinite dataset for training is to avoid the potential last partial batch
# in each epoch, so that you don't need to think about scaling the gradients
# based on the actual batch size.
if is_training:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
使用 Keras 高级 API 训练模型#
可以使用 Keras Model.fit
和 Model.compile
API 训练模型。在此步骤中没有特定于 TPU 的内容,可以像使用多个 GPU 和 MirroredStrategy
而不是 TPUStrategy
一样编写代码。可以在使用 Keras 进行分布式训练教程中了解详情。
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size
train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
为了减少 Python 开销,同时最大限度提升 TPU 的性能,请将 steps_per_execution
参数传入 Keras Model.compile
。在本例中,它可以将吞吐量提升约 50%:
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
# Anything between 2 and `steps_per_epoch` could help here.
steps_per_execution = 50,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
使用自定义训练循环训练模型#
还可以直接使用 tf.function
和 tf.distribute
API 创建和训练模型。可以使用 Strategy.experimental_distribute_datasets_from_function
API 通过给定的数据集函数分布 tf.data.Dataset
。请注意,在下面的示例中,传递给 Dataset
的批次大小是每个副本的批次大小,而非全局批次大小。要了解详情,请查阅使用 tf.distribute.Strategy
进行自定义训练教程。
首先,创建模型、数据集和 tf.function
:
# Create the model, optimizer and metrics inside the `tf.distribute.Strategy`
# scope, so that the variables can be mirrored on each device.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
# Calculate per replica batch size, and distribute the `tf.data.Dataset`s
# on each TPU worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(
lambda _: get_dataset(per_replica_batch_size, is_training=True))
@tf.function
def train_step(iterator):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(per_example_loss)
model_losses = model.losses
if model_losses:
loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
strategy.run(step_fn, args=(next(iterator),))
然后,运行训练循环:
steps_per_eval = 10000 // batch_size
train_iterator = iter(train_dataset)
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, training accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
training_loss.reset_states()
training_accuracy.reset_states()
在 tf.function
中利用多步法提升性能#
您可以通过在 tf.function
. 中运行多步以提升性能。在 tf.function
内使用 tf.range
包装 Strategy.run
调用即可实现此目的,在 TPU 工作进程上,AutoGraph 会将其转换为 tf.while_loop
。可以在使用 tf.function
升性能指南中详细了解 tf.function
。
在 tf.function
中,虽然多步法的性能更高,但是与单步法相比,可谓各有利弊。在 tf.function
中运行多个步骤不够灵活,您无法以 Eager 方式运行,也不能运行任意 Python 代码。
@tf.function
def train_multiple_steps(iterator, steps):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
per_example_loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(per_example_loss)
model_losses = model.losses
if model_losses:
loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
for _ in tf.range(steps):
strategy.run(step_fn, args=(next(iterator),))
# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))
print('Current step: {}, training loss: {}, training accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
后续步骤#
要详细了解 Cloud TPU 以及如何使用它们,请查看以下资源:
Google Cloud TPU:Google Cloud TPU 首页。
Google Cloud TPU 文档:Google Cloud TPU 文档,其中包括:
Cloud TPU 简介:使用 Cloud TPU 的概述。
Cloud TPU 快速入门:使用 TensorFlow 和其他主要机器学习框架利用 Cloud TPU VM 的快速入门简介。
Google Cloud TPU Colab 笔记本:端到端训练示例。
Google Cloud TPU 性能指南:通过为应用调整 Cloud TPU 配置参数来进一步增强 Cloud TPU 性能。
Distributed training with TensorFlow: How to use distribution strategies—including
tf.distribute.TPUStrategy
—with examples showing best practices.TPU 嵌入向量:TensorFlow 包括通过
tf.tpu.experimental.embedding
在 TPU 上训练嵌入向量的专门支持。此外,TensorFlow Recommenders 还具有tfrs.layers.embedding.TPUEmbedding
。嵌入向量提供高效和密集的表示,捕捉特征之间的复杂相似度和关系。TensorFlow 的 TPU 特定嵌入向量支持允许您训练大于单个 TPU 设备内存的嵌入向量,并在 TPU 上使用稀疏和不规则输入。TPU Research Cloud (TRC):TRC 让研究人员能够申请访问由超过 1,000 个 Cloud TPU 设备组成的集群。