{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "id": "Ndo4ERqnwQOU" }, "outputs": [], "source": [ "##### Copyright 2020 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "MTKwbguKwT4R", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "xfNT-mlFwxVM" }, "source": [ "# 卷积变分自编码器" ] }, { "cell_type": "markdown", "metadata": { "id": "0TD5ZrvEMbhZ" }, "source": [ "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "ITZuApL56Mny" }, "source": [ "此笔记本演示了如何基于 MNIST 数据集训练变分自编码器 (VAE) ([1](https://arxiv.org/abs/1312.6114), [2](https://arxiv.org/abs/1401.4082))。VAE 是一种自编码器取值的概率分布,该模型会获取高维输入数据并将其压缩为较小的表示。与将输入映射到隐向量的传统自编码器不同,VAE 会将输入数据映射到概率分布的参数中,例如高斯分布的均值和方差。这种方式可以生成一个连续、结构化的隐空间,对于图像生成而言十分适用。\n", "\n", "![CVAE 图像隐空间](https://gitlocalize.com/repo/4592/zh-cn/site/en-snapshot/tutorials/generative/images/cvae_latent_space.jpg)" ] }, { "cell_type": "markdown", "metadata": { "id": "e1_Y75QXJS6h" }, "source": [ "## 导入 Tensorflow 与其他库" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "P-JuIu2N_SQf", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "!pip install tensorflow-probability\n", "\n", "# to generate gifs\n", "!pip install imageio\n", "!pip install git+https://github.com/tensorflow/docs" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "YfIk2es3hJEd", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "from IPython import display\n", "\n", "import glob\n", "import imageio\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import PIL\n", "import tensorflow as tf\n", "import tensorflow_probability as tfp\n", "import time" ] }, { "cell_type": "markdown", "metadata": { "id": "iYn4MdZnKCey" }, "source": [ "## 加载 MNIST 数据集\n", "\n", "每个 MNIST 图像最初都是一个由 784 个整数组成的向量,每个整数在 0-255 之间,代表一个像素的强度。在我们的模型中使用伯努利分布对每个像素进行建模,并对数据集进行静态二值化。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "a4fYMGxGhrna", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "(train_images, _), (test_images, _) = tf.keras.datasets.mnist.load_data()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NFC2ghIdiZYE", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def preprocess_images(images):\n", " images = images.reshape((images.shape[0], 28, 28, 1)) / 255.\n", " return np.where(images > .5, 1.0, 0.0).astype('float32')\n", "\n", "train_images = preprocess_images(train_images)\n", "test_images = preprocess_images(test_images)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "S4PIDhoDLbsZ", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "train_size = 60000\n", "batch_size = 32\n", "test_size = 10000" ] }, { "cell_type": "markdown", "metadata": { "id": "PIGN6ouoQxt3" }, "source": [ "## 使用 *tf.data* 来将数据分批和打乱" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "-yKCCQOoJ7cn", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "train_dataset = (tf.data.Dataset.from_tensor_slices(train_images)\n", " .shuffle(train_size).batch(batch_size))\n", "test_dataset = (tf.data.Dataset.from_tensor_slices(test_images)\n", " .shuffle(test_size).batch(batch_size))" ] }, { "cell_type": "markdown", "metadata": { "id": "THY-sZMiQ4UV" }, "source": [ "## 通过 *tf.keras.Sequential* 连接生成网络与推理网络\n", "\n", "在此 VAE 示例中,对编码器和解码器网络使用两个小型 ConvNet。在文献中,这些网络也分别称为推断/识别和生成模型。使用 `tf.keras.Sequential` 来简化实现。在下面的描述中,使 $x$ 和 $z$ 分别表示观测值和隐变量。\n", "\n", "### 生成网络\n", "\n", "这定义了近似后验分布 $q(z|x)$,它会将输入取作观测值并输出一组参数,用于指定隐变量表示 $z$ 的条件分布。在本例中,简单地将分布建模为对角高斯分布,网络会输出分解高斯分布的均值和对数方差参数。输出对数方差而不是直接用于数值稳定性的方差。\n", "\n", "### 推理网络\n", "\n", "这定义了观测值的条件分布 $p(x|z)$,它会将隐变量样本 $z$ 取作输入并输出观测值条件分布的参数。将隐变量先验分布 $p(z)$ 建模为单位高斯分布。\n", "\n", "### 重参数化技巧\n", "\n", "要在训练期间为解码器生成样本 $z$,您可以在给定输入观测值 $x$ 的情况下从编码器输出的参数所定义的隐变量分布中采样。然而,这种采样操作会产生瓶颈,因为反向传播不能流经随机节点。\n", "\n", "要解决这个问题,请使用重参数化技巧。在我们的示例中,使用解码器参数和另一个参数 $\\epsilon$ 来逼近 $z$,如下所示:\n", "\n", "$$z = \\mu + \\sigma \\odot \\epsilon$$\n", "\n", "其中 $\\mu$ 和 $\\sigma$ 分别代表高斯分布的均值和标准差。它们可通过解码器输出推导得出。$\\epsilon$ 可被认为是用于保持 $z$ 的随机性的随机噪声。从标准正态分布生成 $\\epsilon$。\n", "\n", "隐变量 $z$ 现在由 $\\mu$、$\\sigma$ 和 $\\epsilon$ 的函数生成,这将使模型能够分别通过 $\\mu$ 和 $\\sigma$ 在编码器中反向传播梯度,同时通过 $\\epsilon$ 保持随机性。\n", "\n", "### 网络架构\n", "\n", "对于编码器网络,使用两个卷积层后接一个全连接层。在解码器网络中,通过使用一个全连接层后接三个卷积转置层(在某些背景下也称为反卷积层)来镜像此架构。请注意,通常的做法是在训练 VAE 时避免使用批量归一化,因为使用 mini-batch 导致的额外随机性可能会在提高采样随机性的同时加剧不稳定性。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "VGLbvBEmjK0a", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "class CVAE(tf.keras.Model):\n", " \"\"\"Convolutional variational autoencoder.\"\"\"\n", "\n", " def __init__(self, latent_dim):\n", " super(CVAE, self).__init__()\n", " self.latent_dim = latent_dim\n", " self.encoder = tf.keras.Sequential(\n", " [\n", " tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),\n", " tf.keras.layers.Conv2D(\n", " filters=32, kernel_size=3, strides=(2, 2), activation='relu'),\n", " tf.keras.layers.Conv2D(\n", " filters=64, kernel_size=3, strides=(2, 2), activation='relu'),\n", " tf.keras.layers.Flatten(),\n", " # No activation\n", " tf.keras.layers.Dense(latent_dim + latent_dim),\n", " ]\n", " )\n", "\n", " self.decoder = tf.keras.Sequential(\n", " [\n", " tf.keras.layers.InputLayer(input_shape=(latent_dim,)),\n", " tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),\n", " tf.keras.layers.Reshape(target_shape=(7, 7, 32)),\n", " tf.keras.layers.Conv2DTranspose(\n", " filters=64, kernel_size=3, strides=2, padding='same',\n", " activation='relu'),\n", " tf.keras.layers.Conv2DTranspose(\n", " filters=32, kernel_size=3, strides=2, padding='same',\n", " activation='relu'),\n", " # No activation\n", " tf.keras.layers.Conv2DTranspose(\n", " filters=1, kernel_size=3, strides=1, padding='same'),\n", " ]\n", " )\n", "\n", " @tf.function\n", " def sample(self, eps=None):\n", " if eps is None:\n", " eps = tf.random.normal(shape=(100, self.latent_dim))\n", " return self.decode(eps, apply_sigmoid=True)\n", "\n", " def encode(self, x):\n", " mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)\n", " return mean, logvar\n", "\n", " def reparameterize(self, mean, logvar):\n", " eps = tf.random.normal(shape=mean.shape)\n", " return eps * tf.exp(logvar * .5) + mean\n", "\n", " def decode(self, z, apply_sigmoid=False):\n", " logits = self.decoder(z)\n", " if apply_sigmoid:\n", " probs = tf.sigmoid(logits)\n", " return probs\n", " return logits" ] }, { "cell_type": "markdown", "metadata": { "id": "0FMYgY_mPfTi" }, "source": [ "## 定义损失函数和优化器\n", "\n", "VAE 通过最大化边际对数似然的证据下界(ELBO)进行训练:\n", "\n", "$$\\log p(x) \\ge \\text{ELBO} = \\mathbb{E}_{q(z|x)}\\left[\\log \\frac{p(x, z)}{q(z|x)}\\right].$$\n", "\n", "在实践中,优化此期望的单样本蒙特卡罗估值:\n", "\n", "$$\\log p(x| z) + \\log p(z) - \\log q(z|x)$$,其中 $z$ 从 $q(z|x)$ 中采样。\n", "\n", "注:您也可以分析计算 KL 项,但为了简单起见,您在此处会将三个项全部应用到蒙特卡罗 Estimator 中。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iWCn_PVdEJZ7", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "optimizer = tf.keras.optimizers.Adam(1e-4)\n", "\n", "\n", "def log_normal_pdf(sample, mean, logvar, raxis=1):\n", " log2pi = tf.math.log(2. * np.pi)\n", " return tf.reduce_sum(\n", " -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),\n", " axis=raxis)\n", "\n", "\n", "def compute_loss(model, x):\n", " mean, logvar = model.encode(x)\n", " z = model.reparameterize(mean, logvar)\n", " x_logit = model.decode(z)\n", " cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)\n", " logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])\n", " logpz = log_normal_pdf(z, 0., 0.)\n", " logqz_x = log_normal_pdf(z, mean, logvar)\n", " return -tf.reduce_mean(logpx_z + logpz - logqz_x)\n", "\n", "\n", "@tf.function\n", "def train_step(model, x, optimizer):\n", " \"\"\"Executes one training step and returns the loss.\n", "\n", " This function computes the loss and gradients, and uses the latter to\n", " update the model's parameters.\n", " \"\"\"\n", " with tf.GradientTape() as tape:\n", " loss = compute_loss(model, x)\n", " gradients = tape.gradient(loss, model.trainable_variables)\n", " optimizer.apply_gradients(zip(gradients, model.trainable_variables))" ] }, { "cell_type": "markdown", "metadata": { "id": "Rw1fkAczTQYh" }, "source": [ "## 训练\n", "\n", "- 首先迭代数据集\n", "- 在每次迭代期间,将图像传递给编码器以获取近似后验分布 $q(z|x)$ 的一组均值和对数方差参数\n", "- 然后,应用*重参数化技巧*以从 $q(z|x)$ 中采样\n", "- 最后,将重参数化的样本传递给解码器以获取生成分布 $p(x|z)$ 的 logit\n", "- 注:由于您使用由 Keras 加载的数据集,训练集中有 6 万个数据点,测试集中有 1 万个数据点,因此我们基于测试集得出的 ELBO 会略高于使用 Larochelle 的 MNIST 动态二值化的文献中报告的结果。\n", "\n", "### 生成图像\n", "\n", "- 进行训练后,可以生成一些图片了\n", "- 首先从单位高斯先验分布 $p(z)$ 中采样一组隐向量\n", "- 随后生成器将潜在样本 $z$ 转换为观测值的 logit,得到分布 $p(x|z)$\n", "- 在此处,绘制伯努利分布的概率分布图\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "NS2GWywBbAWo", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "epochs = 10\n", "# set the dimensionality of the latent space to a plane for visualization later\n", "latent_dim = 2\n", "num_examples_to_generate = 16\n", "\n", "# keeping the random vector constant for generation (prediction) so\n", "# it will be easier to see the improvement.\n", "random_vector_for_generation = tf.random.normal(\n", " shape=[num_examples_to_generate, latent_dim])\n", "model = CVAE(latent_dim)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "RmdVsmvhPxyy", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def generate_and_save_images(model, epoch, test_sample):\n", " mean, logvar = model.encode(test_sample)\n", " z = model.reparameterize(mean, logvar)\n", " predictions = model.sample(z)\n", " fig = plt.figure(figsize=(4, 4))\n", "\n", " for i in range(predictions.shape[0]):\n", " plt.subplot(4, 4, i + 1)\n", " plt.imshow(predictions[i, :, :, 0], cmap='gray')\n", " plt.axis('off')\n", "\n", " # tight_layout minimizes the overlap between 2 sub-plots\n", " plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))\n", " plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "swCyrbqQQ-Ri", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "# Pick a sample of the test set for generating output images\n", "assert batch_size >= num_examples_to_generate\n", "for test_batch in test_dataset.take(1):\n", " test_sample = test_batch[0:num_examples_to_generate, :, :, :]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "2M7LmLtGEMQJ", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "generate_and_save_images(model, 0, test_sample)\n", "\n", "for epoch in range(1, epochs + 1):\n", " start_time = time.time()\n", " for train_x in train_dataset:\n", " train_step(model, train_x, optimizer)\n", " end_time = time.time()\n", "\n", " loss = tf.keras.metrics.Mean()\n", " for test_x in test_dataset:\n", " loss(compute_loss(model, test_x))\n", " elbo = -loss.result()\n", " display.clear_output(wait=False)\n", " print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: {}'\n", " .format(epoch, elbo, end_time - start_time))\n", " generate_and_save_images(model, epoch, test_sample)" ] }, { "cell_type": "markdown", "metadata": { "id": "P4M_vIbUi7c0" }, "source": [ "### 使用 epoch 编号显示图片" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "WfO5wCdclHGL", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def display_image(epoch_no):\n", " return PIL.Image.open('image_at_epoch_{:04d}.png'.format(epoch_no))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "5x3q9_Oe5q0A", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "plt.imshow(display_image(epoch))\n", "plt.axis('off') # Display images" ] }, { "cell_type": "markdown", "metadata": { "id": "NywiH3nL8guF" }, "source": [ "### 生成所有保存图片的 GIF" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "IGKQgENQ8lEI", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "anim_file = 'cvae.gif'\n", "\n", "with imageio.get_writer(anim_file, mode='I') as writer:\n", " filenames = glob.glob('image*.png')\n", " filenames = sorted(filenames)\n", " for filename in filenames:\n", " image = imageio.imread(filename)\n", " writer.append_data(image)\n", " image = imageio.imread(filename)\n", " writer.append_data(image)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "2ZqAEtdqUmJF", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "import tensorflow_docs.vis.embed as embed\n", "embed.embed_file(anim_file)" ] }, { "cell_type": "markdown", "metadata": { "id": "PeunRU6TSumT" }, "source": [ "### 显示隐空间中数字的二维流形\n", "\n", "运行下面的代码将显示不同数字类的连续分布,每个数字都会在二维隐空间中变形为另一数字。使用 [TensorFlow Probability](https://tensorflow.google.cn/probability) 为隐空间生成标准正态分布。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "code", "id": "mNcaaYPBS3mj", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "def plot_latent_images(model, n, digit_size=28):\n", " \"\"\"Plots n x n digit images decoded from the latent space.\"\"\"\n", "\n", " norm = tfp.distributions.Normal(0, 1)\n", " grid_x = norm.quantile(np.linspace(0.05, 0.95, n))\n", " grid_y = norm.quantile(np.linspace(0.05, 0.95, n))\n", " image_width = digit_size*n\n", " image_height = image_width\n", " image = np.zeros((image_height, image_width))\n", "\n", " for i, yi in enumerate(grid_x):\n", " for j, xi in enumerate(grid_y):\n", " z = np.array([[xi, yi]])\n", " x_decoded = model.sample(z)\n", " digit = tf.reshape(x_decoded[0], (digit_size, digit_size))\n", " image[i * digit_size: (i + 1) * digit_size,\n", " j * digit_size: (j + 1) * digit_size] = digit.numpy()\n", "\n", " plt.figure(figsize=(10, 10))\n", " plt.imshow(image, cmap='Greys_r')\n", " plt.axis('Off')\n", " plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "F-ZG69QCZnGY", "vscode": { "languageId": "python" } }, "outputs": [], "source": [ "plot_latent_images(model, 20)" ] }, { "cell_type": "markdown", "metadata": { "id": "HrJRef8Ln945" }, "source": [ "## 后续步骤\n", "\n", "本教程演示了使用 TensorFlow 实现卷积变分自编码器的方式。\n", "\n", "下一步,您可以尝试通过增大网络来改进模型输出。例如,您可以尝试将每个 `Conv2D` 和 `Conv2DTranspose` 层的 `filter` 参数设置为 512。请注意,为了生成最终的二维隐空间图像,您需要将 `latent_dim` 保持为 2。此外,训练时间会随网络的增大而延长。\n", "\n", "您还可以尝试使用不同的数据集实现 VAE,例如 CIFAR-10。\n", "\n", "VAE 支持以多种不同的风格和不同的复杂性实现。您可以从以下资源中找到其他实现:\n", "\n", "- [变分自编码器 (keras.io)](https://keras.io/examples/generative/vae/)\n", "- [“编写自定义层和模型”指南中的 VAE 示例 (tensorflow.org)](https://tensorflow.google.cn/guide/keras/custom_layers_and_models#putting_it_all_together_an_end-to-end_example)\n", "- [TFP 概率层:变分自动编码器](https://tensorflow.google.cn/probability/examples/Probabilistic_Layers_VAE)\n", "\n", "如果您想了解有关 VAE 的更多详细信息,请参阅[变分自编码器简介](https://arxiv.org/abs/1906.02691)。" ] } ], "metadata": { "accelerator": "GPU", "colab": { "collapsed_sections": [], "name": "cvae.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }