{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "Tce3stUlHN0L" }, "outputs": [], "source": [ "##### Copyright 2019 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "tuOe1ymfHZPu" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "MT-LkFOl2axM" }, "source": [ "# 将 DTensor 与 Keras 一起使用" ] }, { "cell_type": "markdown", "metadata": { "id": "r6P32iYYV27b" }, "source": [ "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "vTe9dcbUAwqx" }, "source": [ "## 概述\n", "\n", "在本教程中,您将学习如何将 DTensor 与 Keras 一起使用。\n", "\n", "通过将 DTensor 与 Keras 集成,您可以重用现有的 Keras 层和模型来构建和训练分布式机器学习模型。\n", "\n", "您将使用 MNIST 数据训练多层分类模型。本文将演示如何设置子类化模型、序贯模型和函数式模型的布局。\n", "\n", "本教程假设您已经阅读了 [DTensor 编程指南](/guide/dtensor_overview),并且熟悉基本的 DTensor 概念,例如 `Mesh` 和 `Layout`。\n", "\n", "本教程基于 https://tensorflow.google.cn/datasets/keras_example。" ] }, { "cell_type": "markdown", "metadata": { "id": "keIyP3IoA1o4" }, "source": [ "## 安装\n", "\n", "DTensor 是 TensorFlow 2.9.0 版本的一部分。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4dHik7NYA5vm" }, "outputs": [], "source": [ "!pip install --quiet --upgrade --pre tensorflow tensorflow-datasets" ] }, { "cell_type": "markdown", "metadata": { "id": "VttBMZngDx8x" }, "source": [ "接下来,导入 `tensorflow` 和 `tensorflow.experimental.dtensor`,并将 TensorFlow 配置为使用 8 个虚拟 CPU。\n", "\n", "尽管本示例使用了 CPU,但 DTensor 在 CPU、GPU 或 TPU 设备上的工作方式相同。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CodX6idGBGSm" }, "outputs": [], "source": [ "import tensorflow as tf\n", "import tensorflow_datasets as tfds\n", "from tensorflow.experimental import dtensor" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "aAtvrpasDpDD" }, "outputs": [], "source": [ "def configure_virtual_cpus(ncpu):\n", " phy_devices = tf.config.list_physical_devices('CPU')\n", " tf.config.set_logical_device_configuration(\n", " phy_devices[0], \n", " [tf.config.LogicalDeviceConfiguration()] * ncpu)\n", " \n", "configure_virtual_cpus(8)\n", "tf.config.list_logical_devices('CPU')\n", "\n", "devices = [f'CPU:{i}' for i in range(8)]" ] }, { "cell_type": "markdown", "metadata": { "id": "ogULE1OHtyd9" }, "source": [ "## 确定性伪随机数生成器\n", "\n", "您应当注意的一件事是 DTensor API 要求每个正在运行的客户端具有相同的随机种子,以便它可以具有用于初始化权重的确定性行为。可以通过 `tf.keras.utils.set_random_seed()` 在 Keras 中设置全局种子来实现此目的。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9u85YypguL8N" }, "outputs": [], "source": [ "tf.keras.backend.experimental.enable_tf_random_generator()\n", "tf.keras.utils.set_random_seed(1337)" ] }, { "cell_type": "markdown", "metadata": { "id": "tO11XvPDAu3_" }, "source": [ "## 创建数据并行网格\n", "\n", "本教程演示数据并行训练。适应模型并行训练和空间并行训练可以像切换到一组不同的 `Layout` 对象一样简单。有关数据并行之外的分布式训练的更多信息,请参阅 [DTensor 深入机器学习教程](https://tensorflow.google.cn/tutorials/distribute/dtensor_ml_tutorial)。\n", "\n", "数据并行训练是一种常用的并行训练方案,也被诸如 `tf.distribute.MirroredStrategy` 等使用。\n", "\n", "使用 DTensor,数据并行训练循环使用由单个“批次”维度组成的 `Mesh`,其中每个设备都会运行模型的副本,从全局批次接收分片。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6sT6s6z4j9H-" }, "outputs": [], "source": [ "mesh = dtensor.create_mesh([(\"batch\", 8)], devices=devices)" ] }, { "cell_type": "markdown", "metadata": { "id": "rouFcF6FE0aF" }, "source": [ "由于每个设备都运行模型的完整副本,模型变量应在网格中完全复制(不分片)。例如,此 `Mesh` 上 2 秩权重的完全复制布局如下:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "U8OxvkDKE1Nu" }, "outputs": [], "source": [ "example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) # or\n", "example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)" ] }, { "cell_type": "markdown", "metadata": { "id": "6Bnic98RE0xi" }, "source": [ "此 `Mesh` 上 2 秩数据张量的布局将沿第一个维度进行分片(有时称为 `batch_sharded`)," ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "PhYp0EKBFfxt" }, "outputs": [], "source": [ "example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh) # or\n", "example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)" ] }, { "cell_type": "markdown", "metadata": { "id": "4U-6n0DericV" }, "source": [ "## 使用布局创建 Keras 层\n", "\n", "在数据并行方案中,您通常使用完全复制的布局创建模型权重,以便模型的每个副本都可以使用分片输入数据进行计算。\n", "\n", "为了为您的层权重配置布局信息,Keras 在层构造函数中为大多数内置层公开了一个额外的参数。\n", "\n", "以下示例使用完全复制的权重布局构建了一个小型图像分类模型。您可以通过参数 `kernel_layout` 和 `bias_layout` 在 `tf.keras.layers.Dense` 中指定布局信息 `kernel` 和 `bias`。大多数内置 Keras 层都可以显式地指定层权重的 `Layout`。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Koc5GlA1tFXY" }, "outputs": [], "source": [ "unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)\n", "unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GfOGTIxGs5Ql" }, "outputs": [], "source": [ "model = tf.keras.models.Sequential([\n", " tf.keras.layers.Flatten(input_shape=(28, 28)),\n", " tf.keras.layers.Dense(128, \n", " activation='relu',\n", " name='d1',\n", " kernel_layout=unsharded_layout_2d, \n", " bias_layout=unsharded_layout_1d),\n", " tf.keras.layers.Dense(10,\n", " name='d2',\n", " kernel_layout=unsharded_layout_2d, \n", " bias_layout=unsharded_layout_1d)\n", "])" ] }, { "cell_type": "markdown", "metadata": { "id": "0frf3jsVtx_n" }, "source": [ "您可以通过检查权重的 `layout` 属性来查看布局信息。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Z_nqv_VdwcXo" }, "outputs": [], "source": [ "for weight in model.weights:\n", " print(f'Weight name: {weight.name} with layout: {weight.layout}')\n", " break" ] }, { "cell_type": "markdown", "metadata": { "id": "6FMGB-QsxPtU" }, "source": [ "## 加载数据集并构建输入流水线\n", "\n", "加载一个 MNIST 数据集并为其配置一些预处理输入流水线。数据集本身与任何 DTensor 布局信息不关联。我们计划在未来的 TensorFlow 版本中改进 DTensor Keras 与 `tf.data` 的集成。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "zGt4kwltxOt4" }, "outputs": [], "source": [ "(ds_train, ds_test), ds_info = tfds.load(\n", " 'mnist',\n", " split=['train', 'test'],\n", " shuffle_files=True,\n", " as_supervised=True,\n", " with_info=True,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "HkUaOB_ryaLH" }, "outputs": [], "source": [ "def normalize_img(image, label):\n", " \"\"\"Normalizes images: `uint8` -> `float32`.\"\"\"\n", " return tf.cast(image, tf.float32) / 255., label" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Efm2H1iqydan" }, "outputs": [], "source": [ "batch_size = 128\n", "\n", "ds_train = ds_train.map(\n", " normalize_img, num_parallel_calls=tf.data.AUTOTUNE)\n", "ds_train = ds_train.cache()\n", "ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)\n", "ds_train = ds_train.batch(batch_size)\n", "ds_train = ds_train.prefetch(tf.data.AUTOTUNE)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Lcrg6QAtyis4" }, "outputs": [], "source": [ "ds_test = ds_test.map(\n", " normalize_img, num_parallel_calls=tf.data.AUTOTUNE)\n", "ds_test = ds_test.batch(batch_size)\n", "ds_test = ds_test.cache()\n", "ds_test = ds_test.prefetch(tf.data.AUTOTUNE)" ] }, { "cell_type": "markdown", "metadata": { "id": "fHEZwib7lhqn" }, "source": [ "## 定义模型的训练逻辑\n", "\n", "接下来,定义模型的训练和评估逻辑。\n", "\n", "从 TensorFlow 2.9 开始,您必须为启用 DTensor 的 Keras 模型编写自定义训练循环。这是为了用适当的布局信息打包输入数据,这些信息未与 Keras 中的标准 `tf.keras.Model.fit()` 或 `tf.keras.Model.eval()` 函数集成。您将在即将发布的版本中获得更多 `tf.data` 支持。 " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "CAx11gMjzzjs" }, "outputs": [], "source": [ "@tf.function\n", "def train_step(model, x, y, optimizer, metrics):\n", " with tf.GradientTape() as tape:\n", " logits = model(x, training=True)\n", " # tf.reduce_sum sums the batch sharded per-example loss to a replicated\n", " # global loss (scalar).\n", " loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(\n", " y, logits, from_logits=True))\n", " \n", " gradients = tape.gradient(loss, model.trainable_variables)\n", " optimizer.apply_gradients(zip(gradients, model.trainable_variables))\n", "\n", " for metric in metrics.values():\n", " metric.update_state(y_true=y, y_pred=logits)\n", "\n", " loss_per_sample = loss / len(x)\n", " results = {'loss': loss_per_sample}\n", " return results" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "maSTWeRemO0P" }, "outputs": [], "source": [ "@tf.function\n", "def eval_step(model, x, y, metrics):\n", " logits = model(x, training=False)\n", " loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(\n", " y, logits, from_logits=True))\n", "\n", " for metric in metrics.values():\n", " metric.update_state(y_true=y, y_pred=logits)\n", "\n", " loss_per_sample = loss / len(x)\n", " results = {'eval_loss': loss_per_sample}\n", " return results" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dt00axcLmvLr" }, "outputs": [], "source": [ "def pack_dtensor_inputs(images, labels, image_layout, label_layout):\n", " num_local_devices = image_layout.mesh.num_local_devices()\n", " images = tf.split(images, num_local_devices)\n", " labels = tf.split(labels, num_local_devices)\n", " images = dtensor.pack(images, image_layout)\n", " labels = dtensor.pack(labels, label_layout)\n", " return images, labels" ] }, { "cell_type": "markdown", "metadata": { "id": "9Eb-qIJGrxB9" }, "source": [ "## 指标和优化器\n", "\n", "将 DTensor API 与 Keras `Metric` 和 `Optimizer` 一起使用时,您需要提供额外的网格信息,以便任何内部状态变量和张量都可以使用模型中的变量。\n", "\n", "- 对于优化器,DTensor 引入了一个新的实验性命名空间 `keras.dtensor.experimental.optimizers`,其中扩展了许多现有的 Keras 优化器以接收额外的 `mesh` 参数。在未来的版本中,它可能会与 Keras 核心优化器合并。\n", "\n", "- 对于指标,可以直接将 `mesh` 作为参数指定给构造函数,使其成为兼容 DTensor 的 `Metric`。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1lu_0mz1sxrl" }, "outputs": [], "source": [ "optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)\n", "metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}\n", "eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}" ] }, { "cell_type": "markdown", "metadata": { "id": "QzufrkistELx" }, "source": [ "## 训练模型\n", "\n", "以下示例在批次维度上对来自输入流水线的数据进行分片,并使用具有完全复制权重的模型进行训练。\n", "\n", "经过 3 个周期后,模型应当达到大约 97% 的准确率。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "kZW568Dk0vvL" }, "outputs": [], "source": [ "num_epochs = 3\n", "\n", "image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)\n", "label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)\n", "\n", "for epoch in range(num_epochs):\n", " print(\"============================\") \n", " print(\"Epoch: \", epoch)\n", " for metric in metrics.values():\n", " metric.reset_state()\n", " step = 0\n", " results = {}\n", " pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])\n", " for input in ds_train:\n", " images, labels = input[0], input[1]\n", " images, labels = pack_dtensor_inputs(\n", " images, labels, image_layout, label_layout)\n", "\n", " results.update(train_step(model, images, labels, optimizer, metrics))\n", " for metric_name, metric in metrics.items():\n", " results[metric_name] = metric.result()\n", "\n", " pbar.update(step, values=results.items(), finalize=False)\n", " step += 1\n", " pbar.update(step, values=results.items(), finalize=True)\n", "\n", " for metric in eval_metrics.values():\n", " metric.reset_state()\n", " for input in ds_test:\n", " images, labels = input[0], input[1]\n", " images, labels = pack_dtensor_inputs(\n", " images, labels, image_layout, label_layout)\n", " results.update(eval_step(model, images, labels, eval_metrics))\n", "\n", " for metric_name, metric in eval_metrics.items():\n", " results[metric_name] = metric.result()\n", " \n", " for metric_name, metric in results.items():\n", " print(f\"{metric_name}: {metric.numpy()}\")\n" ] }, { "cell_type": "markdown", "metadata": { "id": "HYEXF6qCuoSr" }, "source": [ "## 为现有模型代码指定布局\n", "\n", "通常,您的模型非常适合您的用例。为模型中的每个单独层指定 `Layout` 信息将是一项需要大量编辑的工作。\n", "\n", "为了帮助您轻松地将现有 Keras 模型转换为使用 DTensor API,可以使用新的 `dtensor.LayoutMap` API,它允许您从全局角度指定 `Layout`。\n", "\n", "首先,您需要创建一个 `LayoutMap` 实例,它是一个类似字典的对象,其中包含您要为模型权重指定的所有 `Layout`。\n", "\n", "`LayoutMap` 在初始化时需要一个 `Mesh` 实例,该实例可用于为任何未配置布局的权重提供默认的复制 `Layout`。如果您希望完全复制所有模型权重,则可以提供空的 `LayoutMap`,默认网格将用于创建复制的 `Layout`。\n", "\n", "`LayoutMap` 使用字符串作为键,使用 `Layout` 作为值。普通的 Python 字典与此类之间存在行为差异。检索值时,字符串键将被视为正则表达式" ] }, { "cell_type": "markdown", "metadata": { "id": "SCq5Nl-UP_dS" }, "source": [ "### 子类化模型\n", "\n", "考虑使用 Keras 子类化模型语法定义的以下模型。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "LZ0hRFs8unu0" }, "outputs": [], "source": [ "class SubclassedModel(tf.keras.Model):\n", "\n", " def __init__(self, name=None):\n", " super().__init__(name=name)\n", " self.feature = tf.keras.layers.Dense(16)\n", " self.feature_2 = tf.keras.layers.Dense(24)\n", " self.dropout = tf.keras.layers.Dropout(0.1)\n", "\n", " def call(self, inputs, training=None):\n", " x = self.feature(inputs)\n", " x = self.dropout(x, training=training)\n", " return self.feature_2(x)" ] }, { "cell_type": "markdown", "metadata": { "id": "1njxqPB-yS97" }, "source": [ "此模型中有 4 个权重,分别是两个 `Dense` 层的 `kernel` 和 `bias`。它们中的每一个都基于对象路径进行映射:\n", "\n", "- `model.feature.kernel`\n", "- `model.feature.bias`\n", "- `model.feature_2.kernel`\n", "- `model.feature_2.bias`\n", "\n", "注:对于子类化模型,特性名称而不是层的 `.name` 特性用作从映射中检索布局的键。这与 `tf.Module` 检查点遵循的约定一致。对于具有多个层的复杂模型,您可以[手动检查检查点](https://tensorflow.google.cn/guide/checkpoint#manually_inspecting_checkpoints)来查看特性映射。\n", "\n", "现在,定义以下 `LayoutMap` 并将其应用于模型。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "goVX6iIZw468" }, "outputs": [], "source": [ "layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)\n", "\n", "layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)\n", "layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)\n", "\n", "with layout_map.scope():\n", " subclassed_model = SubclassedModel()" ] }, { "cell_type": "markdown", "metadata": { "id": "M32HcSp_PyWs" }, "source": [ "模型权重是在第一次调用时创建的,因此使用 DTensor 输入调用模型并确认权重具有预期的布局。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "c3CbD9l7qUNq" }, "outputs": [], "source": [ "dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)\n", "# Trigger the weights creation for subclass model\n", "subclassed_model(dtensor_input)\n", "\n", "print(subclassed_model.feature.kernel.layout)" ] }, { "cell_type": "markdown", "metadata": { "id": "ZyCnfd-4Q2jk" }, "source": [ "这样一来,您就可以快速将 `Layout` 映射到您的模型,而无需更新任何现有代码。 " ] }, { "cell_type": "markdown", "metadata": { "id": "6GliUdWTQnKC" }, "source": [ "### 序贯模型和函数式模型" ] }, { "cell_type": "markdown", "metadata": { "id": "6zzvTqAR2Teu" }, "source": [ "对于 Keras 序贯和函数式模型,您也可以使用 `LayoutMap`。\n", "\n", "注:对于序贯模型和函数式模型,映射略有不同。模型中的层没有附加到模型的公共特性(尽管可以通过 `model.layers` 作为列表访问它们)。在这种情况下,使用字符串名称作为键。字符串名称保证在模型中是唯一的。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gXK2EquIRJCC" }, "outputs": [], "source": [ "layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)\n", "\n", "layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)\n", "layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "cBzwJqrg2TH3" }, "outputs": [], "source": [ "with layout_map.scope():\n", " inputs = tf.keras.Input((16,), batch_size=16)\n", " x = tf.keras.layers.Dense(16, name='feature')(inputs)\n", " x = tf.keras.layers.Dropout(0.1)(x)\n", " output = tf.keras.layers.Dense(32, name='feature_2')(x)\n", " model = tf.keras.Model(inputs, output)\n", "\n", "print(model.layers[1].kernel.layout)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pPuh1NlE3-wO" }, "outputs": [], "source": [ "with layout_map.scope():\n", " model = tf.keras.Sequential([\n", " tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),\n", " tf.keras.layers.Dropout(0.1),\n", " tf.keras.layers.Dense(32, name='feature_2')\n", " ])\n", "\n", "print(model.layers[2].kernel.layout)" ] } ], "metadata": { "colab": { "name": "dtensor_keras_tutorial.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "xxx", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.12.2" } }, "nbformat": 4, "nbformat_minor": 0 }