##### Copyright 2019 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

将 DTensor 与 Keras 一起使用#

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述#

在本教程中,您将学习如何将 DTensor 与 Keras 一起使用。

通过将 DTensor 与 Keras 集成,您可以重用现有的 Keras 层和模型来构建和训练分布式机器学习模型。

您将使用 MNIST 数据训练多层分类模型。本文将演示如何设置子类化模型、序贯模型和函数式模型的布局。

本教程假设您已经阅读了 DTensor 编程指南,并且熟悉基本的 DTensor 概念,例如 MeshLayout

本教程基于 https://tensorflow.google.cn/datasets/keras_example。

安装#

DTensor 是 TensorFlow 2.9.0 版本的一部分。

!pip install --quiet --upgrade --pre tensorflow tensorflow-datasets

接下来,导入 tensorflowtensorflow.experimental.dtensor,并将 TensorFlow 配置为使用 8 个虚拟 CPU。

尽管本示例使用了 CPU,但 DTensor 在 CPU、GPU 或 TPU 设备上的工作方式相同。

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
def configure_virtual_cpus(ncpu):
  phy_devices = tf.config.list_physical_devices('CPU')
  tf.config.set_logical_device_configuration(
        phy_devices[0], 
        [tf.config.LogicalDeviceConfiguration()] * ncpu)
  
configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')

devices = [f'CPU:{i}' for i in range(8)]

确定性伪随机数生成器#

您应当注意的一件事是 DTensor API 要求每个正在运行的客户端具有相同的随机种子,以便它可以具有用于初始化权重的确定性行为。可以通过 tf.keras.utils.set_random_seed() 在 Keras 中设置全局种子来实现此目的。

tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)

创建数据并行网格#

本教程演示数据并行训练。适应模型并行训练和空间并行训练可以像切换到一组不同的 Layout 对象一样简单。有关数据并行之外的分布式训练的更多信息,请参阅 DTensor 深入机器学习教程

数据并行训练是一种常用的并行训练方案,也被诸如 tf.distribute.MirroredStrategy 等使用。

使用 DTensor,数据并行训练循环使用由单个“批次”维度组成的 Mesh,其中每个设备都会运行模型的副本,从全局批次接收分片。

mesh = dtensor.create_mesh([("batch", 8)], devices=devices)

由于每个设备都运行模型的完整副本,模型变量应在网格中完全复制(不分片)。例如,此 Mesh 上 2 秩权重的完全复制布局如下:

example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)  # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)

Mesh 上 2 秩数据张量的布局将沿第一个维度进行分片(有时称为 batch_sharded),

example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh)  # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)

使用布局创建 Keras 层#

在数据并行方案中,您通常使用完全复制的布局创建模型权重,以便模型的每个副本都可以使用分片输入数据进行计算。

为了为您的层权重配置布局信息,Keras 在层构造函数中为大多数内置层公开了一个额外的参数。

以下示例使用完全复制的权重布局构建了一个小型图像分类模型。您可以通过参数 kernel_layoutbias_layouttf.keras.layers.Dense 中指定布局信息 kernelbias。大多数内置 Keras 层都可以显式地指定层权重的 Layout

unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, 
                        activation='relu',
                        name='d1',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d),
  tf.keras.layers.Dense(10,
                        name='d2',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d)
])

您可以通过检查权重的 layout 属性来查看布局信息。

for weight in model.weights:
  print(f'Weight name: {weight.name} with layout: {weight.layout}')
  break

加载数据集并构建输入流水线#

加载一个 MNIST 数据集并为其配置一些预处理输入流水线。数据集本身与任何 DTensor 布局信息不关联。我们计划在未来的 TensorFlow 版本中改进 DTensor Keras 与 tf.data 的集成。

(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label
batch_size = 128

ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

定义模型的训练逻辑#

接下来,定义模型的训练和评估逻辑。

从 TensorFlow 2.9 开始,您必须为启用 DTensor 的 Keras 模型编写自定义训练循环。这是为了用适当的布局信息打包输入数据,这些信息未与 Keras 中的标准 tf.keras.Model.fit()tf.keras.Model.eval() 函数集成。您将在即将发布的版本中获得更多 tf.data 支持。

@tf.function
def train_step(model, x, y, optimizer, metrics):
  with tf.GradientTape() as tape:
    logits = model(x, training=True)
    # tf.reduce_sum sums the batch sharded per-example loss to a replicated
    # global loss (scalar).
    loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))
    
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'loss': loss_per_sample}
  return results
@tf.function
def eval_step(model, x, y, metrics):
  logits = model(x, training=False)
  loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'eval_loss': loss_per_sample}
  return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
  num_local_devices = image_layout.mesh.num_local_devices()
  images = tf.split(images, num_local_devices)
  labels = tf.split(labels, num_local_devices)
  images = dtensor.pack(images, image_layout)
  labels = dtensor.pack(labels, label_layout)
  return  images, labels

指标和优化器#

将 DTensor API 与 Keras MetricOptimizer 一起使用时,您需要提供额外的网格信息,以便任何内部状态变量和张量都可以使用模型中的变量。

  • 对于优化器,DTensor 引入了一个新的实验性命名空间 keras.dtensor.experimental.optimizers,其中扩展了许多现有的 Keras 优化器以接收额外的 mesh 参数。在未来的版本中,它可能会与 Keras 核心优化器合并。

  • 对于指标,可以直接将 mesh 作为参数指定给构造函数,使其成为兼容 DTensor 的 Metric

optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}

训练模型#

以下示例在批次维度上对来自输入流水线的数据进行分片,并使用具有完全复制权重的模型进行训练。

经过 3 个周期后,模型应当达到大约 97% 的准确率。

num_epochs = 3

image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

for epoch in range(num_epochs):
  print("============================") 
  print("Epoch: ", epoch)
  for metric in metrics.values():
    metric.reset_state()
  step = 0
  results = {}
  pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
  for input in ds_train:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)

    results.update(train_step(model, images, labels, optimizer, metrics))
    for metric_name, metric in metrics.items():
      results[metric_name] = metric.result()

    pbar.update(step, values=results.items(), finalize=False)
    step += 1
  pbar.update(step, values=results.items(), finalize=True)

  for metric in eval_metrics.values():
    metric.reset_state()
  for input in ds_test:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)
    results.update(eval_step(model, images, labels, eval_metrics))

  for metric_name, metric in eval_metrics.items():
    results[metric_name] = metric.result()
  
  for metric_name, metric in results.items():
    print(f"{metric_name}: {metric.numpy()}")

为现有模型代码指定布局#

通常,您的模型非常适合您的用例。为模型中的每个单独层指定 Layout 信息将是一项需要大量编辑的工作。

为了帮助您轻松地将现有 Keras 模型转换为使用 DTensor API,可以使用新的 dtensor.LayoutMap API,它允许您从全局角度指定 Layout

首先,您需要创建一个 LayoutMap 实例,它是一个类似字典的对象,其中包含您要为模型权重指定的所有 Layout

LayoutMap 在初始化时需要一个 Mesh 实例,该实例可用于为任何未配置布局的权重提供默认的复制 Layout。如果您希望完全复制所有模型权重,则可以提供空的 LayoutMap,默认网格将用于创建复制的 Layout

LayoutMap 使用字符串作为键,使用 Layout 作为值。普通的 Python 字典与此类之间存在行为差异。检索值时,字符串键将被视为正则表达式

子类化模型#

考虑使用 Keras 子类化模型语法定义的以下模型。

class SubclassedModel(tf.keras.Model):

  def __init__(self, name=None):
    super().__init__(name=name)
    self.feature = tf.keras.layers.Dense(16)
    self.feature_2 = tf.keras.layers.Dense(24)
    self.dropout = tf.keras.layers.Dropout(0.1)

  def call(self, inputs, training=None):
    x = self.feature(inputs)
    x = self.dropout(x, training=training)
    return self.feature_2(x)

此模型中有 4 个权重,分别是两个 Dense 层的 kernelbias。它们中的每一个都基于对象路径进行映射:

  • model.feature.kernel

  • model.feature.bias

  • model.feature_2.kernel

  • model.feature_2.bias

注:对于子类化模型,特性名称而不是层的 .name 特性用作从映射中检索布局的键。这与 tf.Module 检查点遵循的约定一致。对于具有多个层的复杂模型,您可以手动检查检查点来查看特性映射。

现在,定义以下 LayoutMap 并将其应用于模型。

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

with layout_map.scope():
  subclassed_model = SubclassedModel()

模型权重是在第一次调用时创建的,因此使用 DTensor 输入调用模型并确认权重具有预期的布局。

dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)

print(subclassed_model.feature.kernel.layout)

这样一来,您就可以快速将 Layout 映射到您的模型,而无需更新任何现有代码。

序贯模型和函数式模型#

对于 Keras 序贯和函数式模型,您也可以使用 LayoutMap

注:对于序贯模型和函数式模型,映射略有不同。模型中的层没有附加到模型的公共特性(尽管可以通过 model.layers 作为列表访问它们)。在这种情况下,使用字符串名称作为键。字符串名称保证在模型中是唯一的。

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
  inputs = tf.keras.Input((16,), batch_size=16)
  x = tf.keras.layers.Dense(16, name='feature')(inputs)
  x = tf.keras.layers.Dropout(0.1)(x)
  output = tf.keras.layers.Dense(32, name='feature_2')(x)
  model = tf.keras.Model(inputs, output)

print(model.layers[1].kernel.layout)
with layout_map.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
      tf.keras.layers.Dropout(0.1),
      tf.keras.layers.Dense(32, name='feature_2')
  ])

print(model.layers[2].kernel.layout)