{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "wJcYs_ERTnnI" }, "outputs": [], "source": [ "##### Copyright 2021 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "HMUDt0CiUJk9", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "77z2OchJTk0l" }, "source": [ "# 迁移容错机制\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 运行\n", " 在 Github 上查看源代码\n", " 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "n4O6fPyYTxZv" }, "source": [ "容错是指定期保存参数和模型等可跟踪对象的状态的机制。这样,您便能够在训练期间出现程序/机器故障时恢复它们。\n", "\n", "本指南首先演示了如何通过使用 `tf.estimator.RunConfig` 指定指标保存以在 TensorFlow 1 中使用 `tf.estimator.Estimator` 向训练添加容错。随后,您将学习如何通过以下两种方式在 Tensorflow 2 中实现容错训练:\n", "\n", "- 如果您使用 Keras `Model.fit` API,则可以将 `tf.keras.callbacks.BackupAndRestore` 回调传递给它。\n", "- 如果您使用自定义训练循环(使用 `tf.GradientTape`),则可以使用 `tf.train.Checkpoint` 和 `tf.train.CheckpointManager` API 任意保存检查点。\n", "\n", "这两种方式都会备份和恢复[检查点](../../guide/checkpoint.ipynb)文件中的训练状态。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "pHJfmkCFUhQf" }, "source": [ "## 安装" ] }, { "cell_type": "markdown", "metadata": { "id": "TOVQubuDzdmA" }, "source": [ "安装 `tf-nightly`,因为使用 `tf.keras.callbacks.BackupAndRestore` 中的 `save_freq` 参数设置特定步骤保存检查点的频率是从 TensorFlow 2.10 引入的:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pGW0XhXkxY_q", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "!pip install tf-nightly" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "VXnPvQi8Ui1F", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import tensorflow.compat.v1 as tf1\n", "import tensorflow as tf\n", "import numpy as np\n", "import tempfile\n", "import time" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Tww-uIoiUlsT", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "mnist = tf.keras.datasets.mnist\n", "\n", "(x_train, y_train),(x_test, y_test) = mnist.load_data()\n", "x_train, x_test = x_train / 255.0, x_test / 255.0" ] }, { "cell_type": "markdown", "metadata": { "id": "TtlucRG_Uro_" }, "source": [ "## TensorFlow 1:使用 tf.estimator.RunConfig 保存检查点\n", "\n", "在 TensorFlow 1 中,可以配置 `tf.estimator`,随后通过配置 `tf.estimator.RunConfig` 在每一步保存检查点。\n", "\n", "在此示例中,首先编写一个在第五个检查点期间人为抛出错误的钩子:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Q8shCkV2jKcc", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "class InterruptHook(tf1.train.SessionRunHook):\n", " # A hook for artificially interrupting training.\n", " def begin(self):\n", " self._step = -1\n", "\n", " def before_run(self, run_context):\n", " self._step += 1\n", "\n", " def after_run(self, run_context, run_values):\n", " if self._step == 5:\n", " raise RuntimeError('Interruption')" ] }, { "cell_type": "markdown", "metadata": { "id": "ZXbQ6cFlkoIM" }, "source": [ "接下来,配置 `tf.estimator.Estimator` 以保存每个检查点并使用 MNIST 数据集:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1EKXzi4Qj2Eb", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "feature_columns = [tf1.feature_column.numeric_column(\"x\", shape=[28, 28])]\n", "config = tf1.estimator.RunConfig(save_summary_steps=1,\n", " save_checkpoints_steps=1)\n", "\n", "path = tempfile.mkdtemp()\n", "\n", "classifier = tf1.estimator.DNNClassifier(\n", " feature_columns=feature_columns,\n", " hidden_units=[256, 32],\n", " optimizer=tf1.train.AdamOptimizer(0.001),\n", " n_classes=10,\n", " dropout=0.2,\n", " model_dir=path,\n", " config = config\n", ")\n", "\n", "train_input_fn = tf1.estimator.inputs.numpy_input_fn(\n", " x={\"x\": x_train},\n", " y=y_train.astype(np.int32),\n", " num_epochs=10,\n", " batch_size=50,\n", " shuffle=True,\n", ")" ] }, { "cell_type": "markdown", "metadata": { "id": "sGP7Nyenk1gr" }, "source": [ "开始训练模型。您之前定义的钩子将引发人为异常。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xWKMsmt6jYSL", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "try:\n", " classifier.train(input_fn=train_input_fn,\n", " hooks=[InterruptHook()],\n", " max_steps=10)\n", "except Exception as e:\n", " print(f'{type(e).__name__}:{e}')" ] }, { "cell_type": "markdown", "metadata": { "id": "DekxJkgWk-4N" }, "source": [ "使用最后保存的检查点重新构建 `tf.estimator.Estimator` 并继续训练:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vqMVTiJMjcH7", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "classifier = tf1.estimator.DNNClassifier(\n", " feature_columns=feature_columns,\n", " hidden_units=[256, 32],\n", " optimizer=tf1.train.AdamOptimizer(0.001),\n", " n_classes=10,\n", " dropout=0.2,\n", " model_dir=path,\n", " config = config\n", ")\n", "classifier.train(input_fn=train_input_fn,\n", " max_steps = 10)" ] }, { "cell_type": "markdown", "metadata": { "id": "T5LtVtmvYx7J" }, "source": [ "## TensorFlow 2:使用回调和 Model.fit 备份和恢复\n", "\n", "在 TensorFlow 2 中,如果使用 Keras `Model.fit` API 进行训练,则可以提供 `tf.keras.callbacks.BackupAndRestore` 回调来添加容错功能。\n", "\n", "为了帮助演示这一点,首先定义一个 Keras `Callback` 类,该类会在第四个周期检查点期间人为抛出错误:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ci3yB6A5lwJu", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "class InterruptAtEpoch(tf.keras.callbacks.Callback):\n", " # A callback for artificially interrupting training.\n", " def __init__(self, interrupting_epoch=3):\n", " self.interrupting_epoch = interrupting_epoch\n", "\n", " def on_epoch_end(self, epoch, log=None):\n", " if epoch == self.interrupting_epoch:\n", " raise RuntimeError('Interruption')" ] }, { "cell_type": "markdown", "metadata": { "id": "AhU3VTYZoDh-" }, "source": [ "然后,定义并实例化一个简单的 Keras 模型,定义损失函数,调用 `Model.compile` 并设置一个 `tf.keras.callbacks.BackupAndRestore` 回调,它会将检查点保存在周期边界的临时目录中:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1VOQLDNkl2bl", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def create_model():\n", " return tf.keras.models.Sequential([\n", " tf.keras.layers.Flatten(input_shape=(28, 28)),\n", " tf.keras.layers.Dense(512, activation='relu'),\n", " tf.keras.layers.Dropout(0.2),\n", " tf.keras.layers.Dense(10)\n", " ])\n", "loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)\n", "model = create_model()\n", "model.compile(optimizer='adam',\n", " loss=loss,\n", " metrics=['accuracy'])\n", "log_dir = tempfile.mkdtemp()\n", "backup_restore_callback = tf.keras.callbacks.BackupAndRestore(\n", " backup_dir = log_dir)" ] }, { "cell_type": "markdown", "metadata": { "id": "LRRWmZqsvMrq" }, "source": [ "开始使用 `Model.fit` 训练模型。在训练期间,由于上面实例化的 `tf.keras.callbacks.BackupAndRestore` 将保存检查点,而 `InterruptAtEpoch` 类将引发人为异常来模拟第四个周期后的失败。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "8bVO79qWl4Uv", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "try:\n", " model.fit(x=x_train,\n", " y=y_train,\n", " epochs=10,\n", " steps_per_epoch=100,\n", " validation_data=(x_test, y_test),\n", " callbacks=[backup_restore_callback, InterruptAtEpoch()])\n", "except Exception as e:\n", " print(f'{type(e).__name__}:{e}')" ] }, { "cell_type": "markdown", "metadata": { "id": "EWidh234vcRf" }, "source": [ "接下来,实例化 Keras 模型,调用 `Model.compile`,并从之前保存的检查点继续使用 `Model.fit` 训练模型:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "3IWPH0Cmn2wi", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "model = create_model()\n", "model.compile(optimizer='adam',\n", " loss=loss,\n", " metrics=['accuracy'],\n", " steps_per_execution=10)\n", "model.fit(x=x_train,\n", " y=y_train,\n", " epochs=10,\n", " steps_per_epoch=100,\n", " validation_data=(x_test, y_test),\n", " callbacks=[backup_restore_callback])" ] }, { "cell_type": "markdown", "metadata": { "id": "nP2dnpMPxtYj" }, "source": [ "定义另一个 `Callback` 类,该类会在第 140 步期间人为抛出错误:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YardkAaBxr-c", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "class InterruptAtStep(tf.keras.callbacks.Callback):\n", " # A callback for artificially interrupting training.\n", " def __init__(self, interrupting_step=140):\n", " self.total_step_count = 0\n", " self.interrupting_step = interrupting_step\n", "\n", " def on_batch_begin(self, batch, logs=None):\n", " self.total_step_count += 1\n", "\n", " def on_batch_end(self, batch, logs=None):\n", " if self.total_step_count == self.interrupting_step:\n", " print(\"\\nInterrupting at step count\", self.total_step_count)\n", " raise RuntimeError('Interruption')" ] }, { "cell_type": "markdown", "metadata": { "id": "Af3VpehxyTpb" }, "source": [ "注:本部分使用了仅在 Tensorflow 2.10 发布后才能在 `tf-nightly` 中可用的功能。\n", "\n", "要确保检查点每 30 个步骤保存一次,请将 `BackupAndRestore` 回调中的 `save_freq` 设置为 `30`。`InterruptAtStep` 将引发一个人为的异常来模拟周期 1 和步骤 40 的失败(总步数为 140)。最后会在周期 1 和步骤 20 保存检查点。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dHHCENDPyUHS", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "log_dir_2 = tempfile.mkdtemp()\n", "\n", "backup_restore_callback = tf.keras.callbacks.BackupAndRestore(\n", " backup_dir = log_dir_2, save_freq=30\n", ")\n", "model = create_model()\n", "model.compile(optimizer='adam',\n", " loss=loss,\n", " metrics=['accuracy'])\n", "try:\n", " model.fit(x=x_train,\n", " y=y_train,\n", " epochs=10,\n", " steps_per_epoch=100,\n", " validation_data=(x_test, y_test),\n", " callbacks=[backup_restore_callback, InterruptAtStep()])\n", "except Exception as e:\n", " print(f'{type(e).__name__}:{e}')" ] }, { "cell_type": "markdown", "metadata": { "id": "2-ggMFEHynMR" }, "source": [ "接下来,实例化 Keras 模型,调用 `Model.compile`,并从之前保存的检查点继续使用 `Model.fit` 训练模型。请注意,训练从周期 2 和步骤 21 开始。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "vT7Kx30NEqly", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "model = create_model()\n", "model.compile(optimizer='adam',\n", " loss=loss,\n", " metrics=['accuracy'],\n", " steps_per_execution=10)\n", "model.fit(x=x_train,\n", " y=y_train,\n", " epochs=10,\n", " steps_per_epoch=100,\n", " validation_data=(x_test, y_test),\n", " callbacks=[backup_restore_callback])" ] }, { "cell_type": "markdown", "metadata": { "id": "OdWexHUUaEB6" }, "source": [ "## TensorFlow 2:使用自定义训练循环编写手动检查点\n", "\n", "如果您在 TensorFlow 2 中使用自定义训练循环,则可以使用 `tf.train.Checkpoint` 和 `tf.train.CheckpointManager` API 实现容错机制。\n", "\n", "此示例演示了如何执行以下操作:\n", "\n", "- 使用 `tf.train.Checkpoint` 对象手动创建一个检查点,其中要保存的可跟踪对象设置为特性。\n", "- 使用 `tf.train.CheckpointManager` 管理多个检查点。\n", "\n", "首先,定义和实例化 Keras 模型、优化器和损失函数。然后,创建一个 `Checkpoint` 来管理两个具有可跟踪状态的对象(模型和优化器),以及一个 `CheckpointManager` 来记录多个检查点并将它们保存在临时目录中。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hPnIRKC8aDwE", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "model = create_model()\n", "optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)\n", "loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)\n", "log_dir = tempfile.mkdtemp()\n", "epochs = 5\n", "steps_per_epoch = 5\n", "\n", "checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer)\n", "checkpoint_manager = tf.train.CheckpointManager(\n", " checkpoint, log_dir, max_to_keep=2)" ] }, { "cell_type": "markdown", "metadata": { "id": "L2tK4fm6xNkJ" }, "source": [ "现在,实现一个自定义训练循环,在第一个周期之后,每次新周期开始时都会加载最后一个检查点:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GhQphF5jxPWU", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "for epoch in range(epochs):\n", " if epoch > 0:\n", " tf.train.load_checkpoint(save_path)\n", " print(f\"\\nStart of epoch {epoch}\")\n", "\n", " for step in range(steps_per_epoch):\n", " with tf.GradientTape() as tape:\n", "\n", " logits = model(x_train, training=True)\n", " loss_value = loss_fn(y_train, logits)\n", "\n", " grads = tape.gradient(loss_value, model.trainable_weights)\n", " optimizer.apply_gradients(zip(grads, model.trainable_weights))\n", "\n", " save_path = checkpoint_manager.save()\n", " print(f\"Checkpoint saved to {save_path}\")\n", " print(f\"Training loss at step {step}: {loss_value}\")" ] }, { "cell_type": "markdown", "metadata": { "id": "rQUS8nO9FZlH" }, "source": [ "## 后续步骤\n", "\n", "要详细了解 TensorFlow 2 中的容错和检查点,请查看以下文档:\n", "\n", "- `tf.keras.callbacks.BackupAndRestore` 回调 API 文档。\n", "- `tf.train.Checkpoint` 和 `tf.train.CheckpointManager` API 文档。\n", "- [训练检查点](../../guide/checkpoint.ipynb)指南,包括*编写检查点*部分。\n", "\n", "此外,您可能还会发现下列与[分布式训练](../..guide/distributed_training.ipynb)相关的材料十分有用:\n", "\n", "- [使用 Keras 进行多工作进程训练](../../tutorials/distribute/multi_worker_with_keras.ipynb)教程中的*容错*部分。\n", "- [参数服务器训练](../../tutorials/distribute/parameter_server_training.ipynb)教程中的*处理任务失败*部分。" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "fault_tolerance.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }