{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "FhGuhbZ6M5tl" }, "outputs": [], "source": [ "##### Copyright 2022 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "AwOEIRJC6Une", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "EIdT9iu_Z4Rb" }, "source": [ "# 使用 Core API 和 DTensor 进行分布式训练" ] }, { "cell_type": "markdown", "metadata": { "id": "bBIlTPscrIT9" }, "source": [ "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 中运行 在 Github 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "SjAxxRpBzVYg" }, "source": [ "## 简介\n", "\n", "此笔记本使用 [TensorFlow Core 低级 API](https://tensorflow.google.cn/guide/core) 和 [DTensor](https://tensorflow.google.cn/guide/dtensor_overview) 来演示数据并行分布式训练示例。访问 [Core API 概述](https://tensorflow.google.cn/guide/core)以详细了解 TensorFlow Core 及其预期用例。请参阅 [DTensor 概述](https://tensorflow.google.cn/guide/dtensor_overview)指南和[使用 DTensor 进行分布式训练](https://tensorflow.google.cn/tutorials/distribute/dtensor_ml_tutorial)教程以详细了解 DTensor。\n", "\n", "本示例使用[多层感知器](https://tensorflow.google.cn/guide/core/mlp_core)教程中显示的相同模型和优化器。首先请参阅本教程,以熟悉使用 Core API 编写端到端机器学习工作流。\n", "\n", "注:DTensor 仍然是一个实验性 TensorFlow API,这意味着它的功能可用于测试,并且仅用于测试环境。" ] }, { "cell_type": "markdown", "metadata": { "id": "d_OFkG0dyWCp" }, "source": [ "## 使用 DTensor 进行数据并行训练的概述\n", "\n", "在构建支持分布的 MLP 之前,花点时间探索 DTensor 用于数据并行训练的基础知识。\n", "\n", "DTensor 允许您跨设备运行分布式训练,以提高效率、可靠性和可扩缩性。DTensor 通过称为单程序多数据 (SPMD) 扩展的过程根据分片指令分布程序和张量。`DTensor` 感知层的变量被创建为 `dtensor.DVariable`,除了通常的层参数外,`DTensor` 感知层对象的构造函数还接受额外的 `Layout` 输入。\n", "\n", "数据并行训练的主要思路如下:\n", "\n", "- 分别在 N 台设备上复制模型变量。\n", "- 将全局批次拆分成 N 个按副本批次。\n", "- 每个按副本批次都在副本设备上进行训练。\n", "- 在对所有副本共同执行数据加权之前进行梯度归约。\n", "- 数据并行训练能够根据设备数量提供近乎线性的速度。" ] }, { "cell_type": "markdown", "metadata": { "id": "nchsZfwEVtVs" }, "source": [ "## 安装\n", "\n", "DTensor 是 TensorFlow 2.9.0 版本的一部分。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "latuqlI_Yvoo", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#!pip install --quiet --upgrade --pre tensorflow" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1rRo8oNqZ-Rj", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import matplotlib\n", "from matplotlib import pyplot as plt\n", "# Preset Matplotlib figure sizes.\n", "matplotlib.rcParams['figure.figsize'] = [9, 6]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9xQKvCJ85kCQ", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import tensorflow as tf\n", "import tensorflow_datasets as tfds\n", "from tensorflow.experimental import dtensor\n", "print(tf.__version__)\n", "# Set random seed for reproducible results \n", "tf.random.set_seed(22)" ] }, { "cell_type": "markdown", "metadata": { "id": "vDH9-sy4sfPf" }, "source": [ "为本实验配置 8 个虚拟 CPU。DTensor 也可以与 GPU 或 TPU 设备一起使用。鉴于此笔记本使用虚拟设备,分布式训练所获得的加速效果并不明显。 " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "H2iM-6J4s2D6", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def configure_virtual_cpus(ncpu):\n", " phy_devices = tf.config.list_physical_devices('CPU')\n", " tf.config.set_logical_device_configuration(phy_devices[0], [\n", " tf.config.LogicalDeviceConfiguration(),\n", " ] * ncpu)\n", "\n", "configure_virtual_cpus(8)\n", "\n", "DEVICES = [f'CPU:{i}' for i in range(8)]\n", "devices = tf.config.list_logical_devices('CPU')\n", "device_names = [d.name for d in devices]\n", "device_names" ] }, { "cell_type": "markdown", "metadata": { "id": "F_72b0LCNbjx" }, "source": [ "## MNIST 数据集\n", "\n", "可从 [TensorFlow Datasets](https://tensorflow.google.cn/datasets/catalog/mnist) 获得该数据集。将数据拆分为训练集和测试集。仅使用 5000 个样本进行训练和测试以节省时间。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8h4fV_JCfPIX", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "train_data, test_data = tfds.load(\"mnist\", split=['train[:5000]', 'test[:5000]'], batch_size=128, as_supervised=True)" ] }, { "cell_type": "markdown", "metadata": { "id": "twkJ35YB6tSi" }, "source": [ "### 预处理数据\n", "\n", "通过将数据的形状改变为二维并重新缩放数据以适合单位区间 [0,1] 来预处理数据。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6Cmjhg0xCqbz", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def preprocess(x, y):\n", " # Reshaping the data\n", " x = tf.reshape(x, shape=[-1, 784])\n", " # Rescaling the data\n", " x = x/255\n", " return x, y\n", "\n", "train_data, test_data = train_data.map(preprocess), test_data.map(preprocess)" ] }, { "cell_type": "markdown", "metadata": { "id": "6o3CrycBXA2s" }, "source": [ "## 构建 MLP\n", "\n", "使用 DTensor 感知层构建 MLP 模型。" ] }, { "cell_type": "markdown", "metadata": { "id": "OHW6Yvg2yS6H" }, "source": [ "### 密集层\n", "\n", "首先创建一个支持 DTensor 的密集层模块。`dtensor.call_with_layout` 函数可用于调用接受 DTensor 输入并产生 DTensor 输出的函数。这对于使用 TensorFlow 支持的函数初始化 DTensor 变量 `dtensor.DVariable` 十分有用。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "IM0yJos25FG5", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "class DenseLayer(tf.Module):\n", "\n", " def __init__(self, in_dim, out_dim, weight_layout, activation=tf.identity):\n", " super().__init__()\n", " # Initialize dimensions and the activation function\n", " self.in_dim, self.out_dim = in_dim, out_dim\n", " self.activation = activation\n", "\n", " # Initialize the DTensor weights using the Xavier scheme\n", " uniform_initializer = tf.function(tf.random.stateless_uniform)\n", " xavier_lim = tf.sqrt(6.)/tf.sqrt(tf.cast(self.in_dim + self.out_dim, tf.float32))\n", " self.w = dtensor.DVariable(\n", " dtensor.call_with_layout(\n", " uniform_initializer, weight_layout,\n", " shape=(self.in_dim, self.out_dim), seed=(22, 23),\n", " minval=-xavier_lim, maxval=xavier_lim))\n", " \n", " # Initialize the bias with the zeros\n", " bias_layout = weight_layout.delete([0])\n", " self.b = dtensor.DVariable(\n", " dtensor.call_with_layout(tf.zeros, bias_layout, shape=[out_dim]))\n", "\n", " def __call__(self, x):\n", " # Compute the forward pass\n", " z = tf.add(tf.matmul(x, self.w), self.b)\n", " return self.activation(z)" ] }, { "cell_type": "markdown", "metadata": { "id": "X-7MzpjgyHg6" }, "source": [ "### MLP 序贯模型\n", "\n", "现在,创建一个按顺序执行密集层的 MLP 模块。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6XisRWiCyHAb", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "class MLP(tf.Module):\n", "\n", " def __init__(self, layers):\n", " self.layers = layers\n", " \n", " def __call__(self, x, preds=False): \n", " # Execute the model's layers sequentially\n", " for layer in self.layers:\n", " x = layer(x)\n", " return x" ] }, { "cell_type": "markdown", "metadata": { "id": "r5HZJ0kv-V3v" }, "source": [ "使用 DTensor 执行“数据并行”训练等效于 `tf.distribute.MirroredStrategy`。为此,每个设备将在数据批次的一个分片上运行相同的模型。因此,您将需要以下各项:\n", "\n", "- 具有单个 `\"batch\"` 维度的 `dtensor.Mesh`\n", "- 用于在网格中复制(对每个轴使用 `dtensor.UNSHARDED`)的所有权重的 `dtensor.Layout`\n", "- 用于在网格中拆分批次维度的数据的 `dtensor.Layout`\n", "\n", "创建一个包含单个批次维度的 DTensor 网格,其中每个设备都成为从全局批次接收分片的副本。使用此网格实例化具有以下架构的 MLP 模式:\n", "\n", "前向传递:ReLU(784 x 700) x ReLU(700 x 500) x Softmax(500 x 10)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "VmlACuki3oPi", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "mesh = dtensor.create_mesh([(\"batch\", 8)], devices=DEVICES)\n", "weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)\n", "\n", "input_size = 784\n", "hidden_layer_1_size = 700\n", "hidden_layer_2_size = 500\n", "hidden_layer_2_size = 10\n", "\n", "mlp_model = MLP([\n", " DenseLayer(in_dim=input_size, out_dim=hidden_layer_1_size, \n", " weight_layout=weight_layout,\n", " activation=tf.nn.relu),\n", " DenseLayer(in_dim=hidden_layer_1_size , out_dim=hidden_layer_2_size,\n", " weight_layout=weight_layout,\n", " activation=tf.nn.relu),\n", " DenseLayer(in_dim=hidden_layer_2_size, out_dim=hidden_layer_2_size, \n", " weight_layout=weight_layout)])" ] }, { "cell_type": "markdown", "metadata": { "id": "tyBATDoRmDkg" }, "source": [ "### 训练指标\n", "\n", "使用交叉熵损失函数和准确率指标进行训练。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rskOYA7FVCwg", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def cross_entropy_loss(y_pred, y):\n", " # Compute cross entropy loss with a sparse operation\n", " sparse_ce = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=y_pred)\n", " return tf.reduce_mean(sparse_ce)\n", "\n", "def accuracy(y_pred, y):\n", " # Compute accuracy after extracting class predictions\n", " class_preds = tf.argmax(y_pred, axis=1)\n", " is_equal = tf.equal(y, class_preds)\n", " return tf.reduce_mean(tf.cast(is_equal, tf.float32))" ] }, { "cell_type": "markdown", "metadata": { "id": "JSiNRhTOnKZr" }, "source": [ "### 优化器\n", "\n", "与标准梯度下降相比,使用优化器可以显著加快收敛速度​​。Adam 优化器在下面实现,并已配置为与 DTensor 兼容。要将 Keras 优化器与 DTensor 一起使用,请参阅实验性 `tf.keras.dtensor.experimental.optimizers` 模块。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-9kIAI_lfXDS", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "class Adam(tf.Module):\n", "\n", " def __init__(self, model_vars, learning_rate=1e-3, beta_1=0.9, beta_2=0.999, ep=1e-7):\n", " # Initialize optimizer parameters and variable slots\n", " self.model_vars = model_vars\n", " self.beta_1 = beta_1\n", " self.beta_2 = beta_2\n", " self.learning_rate = learning_rate\n", " self.ep = ep\n", " self.t = 1.\n", " self.v_dvar, self.s_dvar = [], []\n", " # Initialize optimizer variable slots\n", " for var in model_vars:\n", " v = dtensor.DVariable(dtensor.call_with_layout(tf.zeros, var.layout, shape=var.shape))\n", " s = dtensor.DVariable(dtensor.call_with_layout(tf.zeros, var.layout, shape=var.shape))\n", " self.v_dvar.append(v)\n", " self.s_dvar.append(s)\n", "\n", " def apply_gradients(self, grads):\n", " # Update the model variables given their gradients\n", " for i, (d_var, var) in enumerate(zip(grads, self.model_vars)):\n", " self.v_dvar[i].assign(self.beta_1*self.v_dvar[i] + (1-self.beta_1)*d_var)\n", " self.s_dvar[i].assign(self.beta_2*self.s_dvar[i] + (1-self.beta_2)*tf.square(d_var))\n", " v_dvar_bc = self.v_dvar[i]/(1-(self.beta_1**self.t))\n", " s_dvar_bc = self.s_dvar[i]/(1-(self.beta_2**self.t))\n", " var.assign_sub(self.learning_rate*(v_dvar_bc/(tf.sqrt(s_dvar_bc) + self.ep)))\n", " self.t += 1.\n", " return " ] }, { "cell_type": "markdown", "metadata": { "id": "w54b7GtLfn1j" }, "source": [ "### 数据打包\n", "\n", "首先,编写一个用于将数据传输到设备的辅助函数。此函数应使用 `dtensor.pack` 将用于副本的全局批次的分片发送(并且仅发送)到支持副本的设备。为简单起见,假设采用一个单客户端应用。\n", "\n", "接下来,编写一个函数,它使用此辅助函数将训练数据批次打包到沿批次(第一个)轴分片的 DTensor 中。这可确保 DTensor 将训练数据均匀分布到“批次”网格维度。请注意,在 DTensor 中,批次大小始终指全局批次大小;因此,批次大小的选择应使其可以被批次网格维度的大小均匀拆分。计划使用其他 DTensor API 来简化 `tf.data` 集成,敬请期待。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "3Rx82djZ6ITm", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def repack_local_tensor(x, layout):\n", " # Repacks a local Tensor-like to a DTensor with layout\n", " # This function assumes a single-client application\n", " x = tf.convert_to_tensor(x)\n", " sharded_dims = []\n", "\n", " # For every sharded dimension, use tf.split to split the along the dimension.\n", " # The result is a nested list of split-tensors in queue[0].\n", " queue = [x]\n", " for axis, dim in enumerate(layout.sharding_specs):\n", " if dim == dtensor.UNSHARDED:\n", " continue\n", " num_splits = layout.shape[axis]\n", " queue = tf.nest.map_structure(lambda x: tf.split(x, num_splits, axis=axis), queue)\n", " sharded_dims.append(dim)\n", "\n", " # Now you can build the list of component tensors by looking up the location in\n", " # the nested list of split-tensors created in queue[0].\n", " components = []\n", " for locations in layout.mesh.local_device_locations():\n", " t = queue[0]\n", " for dim in sharded_dims:\n", " split_index = locations[dim] # Only valid on single-client mesh.\n", " t = t[split_index]\n", " components.append(t)\n", "\n", " return dtensor.pack(components, layout)\n", "\n", "def repack_batch(x, y, mesh):\n", " # Pack training data batches into DTensors along the batch axis\n", " x = repack_local_tensor(x, layout=dtensor.Layout(['batch', dtensor.UNSHARDED], mesh))\n", " y = repack_local_tensor(y, layout=dtensor.Layout(['batch'], mesh))\n", " return x, y" ] }, { "cell_type": "markdown", "metadata": { "id": "osEK3rqpYfKd" }, "source": [ "### 训练\n", "\n", "编写一个可跟踪的函数,在给定一批数据的情况下执行单个训练步骤。此函数不需要任何特殊的 DTensor 注解。此外,还要编写一个执行测试步骤并返回适当性能指标的函数。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ZICEsDGuSbDD", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "@tf.function\n", "def train_step(model, x_batch, y_batch, loss, metric, optimizer):\n", " # Execute a single training step\n", " with tf.GradientTape() as tape:\n", " y_pred = model(x_batch)\n", " batch_loss = loss(y_pred, y_batch)\n", " # Compute gradients and update the model's parameters\n", " grads = tape.gradient(batch_loss, model.trainable_variables)\n", " optimizer.apply_gradients(grads)\n", " # Return batch loss and accuracy\n", " batch_acc = metric(y_pred, y_batch)\n", " return batch_loss, batch_acc\n", "\n", "@tf.function\n", "def test_step(model, x_batch, y_batch, loss, metric):\n", " # Execute a single testing step\n", " y_pred = model(x_batch)\n", " batch_loss = loss(y_pred, y_batch)\n", " batch_acc = metric(y_pred, y_batch)\n", " return batch_loss, batch_acc" ] }, { "cell_type": "markdown", "metadata": { "id": "RjIDVTwwX-Mr" }, "source": [ "现在,以 128 为批次大小对 MLP 模型训练 3 个周期。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "oC85kuZgmh3q", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Initialize the training loop parameters and structures\n", "epochs = 3\n", "batch_size = 128\n", "train_losses, test_losses = [], []\n", "train_accs, test_accs = [], []\n", "optimizer = Adam(mlp_model.trainable_variables)\n", "\n", "# Format training loop\n", "for epoch in range(epochs):\n", " batch_losses_train, batch_accs_train = [], []\n", " batch_losses_test, batch_accs_test = [], []\n", "\n", " # Iterate through training data\n", " for x_batch, y_batch in train_data:\n", " x_batch, y_batch = repack_batch(x_batch, y_batch, mesh)\n", " batch_loss, batch_acc = train_step(mlp_model, x_batch, y_batch, cross_entropy_loss, accuracy, optimizer)\n", " # Keep track of batch-level training performance\n", " batch_losses_train.append(batch_loss)\n", " batch_accs_train.append(batch_acc)\n", "\n", " # Iterate through testing data\n", " for x_batch, y_batch in test_data:\n", " x_batch, y_batch = repack_batch(x_batch, y_batch, mesh)\n", " batch_loss, batch_acc = test_step(mlp_model, x_batch, y_batch, cross_entropy_loss, accuracy)\n", " # Keep track of batch-level testing\n", " batch_losses_test.append(batch_loss)\n", " batch_accs_test.append(batch_acc)\n", "\n", "# Keep track of epoch-level model performance\n", " train_loss, train_acc = tf.reduce_mean(batch_losses_train), tf.reduce_mean(batch_accs_train)\n", " test_loss, test_acc = tf.reduce_mean(batch_losses_test), tf.reduce_mean(batch_accs_test)\n", " train_losses.append(train_loss)\n", " train_accs.append(train_acc)\n", " test_losses.append(test_loss)\n", " test_accs.append(test_acc)\n", " print(f\"Epoch: {epoch}\")\n", " print(f\"Training loss: {train_loss.numpy():.3f}, Training accuracy: {train_acc.numpy():.3f}\")\n", " print(f\"Testing loss: {test_loss.numpy():.3f}, Testing accuracy: {test_acc.numpy():.3f}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "j_RVmt43G12R" }, "source": [ "### 性能评估\n", "\n", "首先,编写一个绘图函数来呈现模型在训练期间的损失和准确率。 " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "VXTCYVtNDjAM", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def plot_metrics(train_metric, test_metric, metric_type):\n", " # Visualize metrics vs training Epochs\n", " plt.figure()\n", " plt.plot(range(len(train_metric)), train_metric, label = f\"Training {metric_type}\")\n", " plt.plot(range(len(test_metric)), test_metric, label = f\"Testing {metric_type}\")\n", " plt.xlabel(\"Epochs\")\n", " plt.ylabel(metric_type)\n", " plt.legend()\n", " plt.title(f\"{metric_type} vs Training Epochs\");" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "407qok7q2JIO", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "plot_metrics(train_losses, test_losses, \"Cross entropy loss\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8H_TgxV92NfX", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "plot_metrics(train_accs, test_accs, \"Accuracy\")" ] }, { "cell_type": "markdown", "metadata": { "id": "DHO_u-3w4YRF" }, "source": [ "## 保存模型\n", "\n", "`tf.saved_model` 和 DTensor 的集成仍在开发中。从 TensorFlow 2.9.0 开始,tf.saved_model 仅接受具有完全复制变量的 DTensor 模型。作为替代方法,您可以通过重新加载检查点将 DTensor 模型转换为完全复制的模型。但是,保存模型后,所有 DTensor 注解都会丢失,保存的签名只能与常规张量一起使用。本教程将在固化后进行更新以展示集成。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "VFLfEH4ManbW" }, "source": [ "## 结论\n", "\n", "此笔记本概括介绍了如何使用 DTensor 和 TensorFlow Core API 进行分布式训练。下面是一些可能有所帮助的提示:\n", "\n", "- [TensorFlow Core API](https://tensorflow.google.cn/guide/core) 可用于构建具有高度可配置性的机器学习工作流,并支持分布式训练。\n", "- [DTensor 概念](https://tensorflow.google.cn/guide/dtensor_overview)指南和[使用 DTensor 进行分布式训练](https://tensorflow.google.cn/tutorials/distribute/dtensor_ml_tutorial)教程包含有关 DTensor 及其集成的最新信息。\n", "\n", "有关使用 TensorFlow Core API 的更多示例,请查阅[指南](https://tensorflow.google.cn/guide/core)。如果您想详细了解如何加载和准备数据,请参阅有关[图像数据加载](https://tensorflow.google.cn/tutorials/load_data/images)或 [CSV 数据加载](https://tensorflow.google.cn/tutorials/load_data/csv)的教程。" ] } ], "metadata": { "colab": { "collapsed_sections": [ "FhGuhbZ6M5tl" ], "name": "distribution.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }