##### Copyright 2022 The TensorFlow Compression Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

学习的数据压缩#

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述#

此笔记本展示了如何使用神经网络和 TensorFlow Compression 进行有损数据压缩。

有损压缩涉及在速率、编码样本所需的预期比特数以及失真、样本重建中的预期误差之间进行权衡。

下面的示例使用类似自动编码器的模型来压缩来自 MNIST 数据集的图像。这种方式基于端到端优化图像压缩这篇论文。

有关学习的数据压缩的更多背景信息,请参阅面向熟悉经典数据压缩的读者的这篇论文,或者面向机器学习受众的这份调查

安装#

通过 pip 安装 Tensorflow Compression。

%%bash
# Installs the latest version of TFC compatible with the installed TF version.

read MAJOR MINOR <<< "$(pip show tensorflow | perl -p -0777 -e 's/.*Version: (\d+)\.(\d+).*/\1 \2/sg')"
pip install "tensorflow-compression<$MAJOR.$(($MINOR+1))"

导入库依赖项。

import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_compression as tfc
import tensorflow_datasets as tfds

定义训练器模型#

由于该模型类似于自动编码器,并且我们需要在训练和推断期间执行一组不同的功函数,设置与分类器略有不同。

训练模型由三个部分组成:

  • 分析(或编码器)转换,将图像转换为隐空间,

  • 合成(或解码器)转换,从隐空间转换回图像空间,以及

  • 先验和熵模型,对隐空间的边际概率进行建模。

首先,定义转换:

def make_analysis_transform(latent_dims):
  """Creates the analysis (encoder) transform."""
  return tf.keras.Sequential([
      tf.keras.layers.Conv2D(
          20, 5, use_bias=True, strides=2, padding="same",
          activation="leaky_relu", name="conv_1"),
      tf.keras.layers.Conv2D(
          50, 5, use_bias=True, strides=2, padding="same",
          activation="leaky_relu", name="conv_2"),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(
          500, use_bias=True, activation="leaky_relu", name="fc_1"),
      tf.keras.layers.Dense(
          latent_dims, use_bias=True, activation=None, name="fc_2"),
  ], name="analysis_transform")
def make_synthesis_transform():
  """Creates the synthesis (decoder) transform."""
  return tf.keras.Sequential([
      tf.keras.layers.Dense(
          500, use_bias=True, activation="leaky_relu", name="fc_1"),
      tf.keras.layers.Dense(
          2450, use_bias=True, activation="leaky_relu", name="fc_2"),
      tf.keras.layers.Reshape((7, 7, 50)),
      tf.keras.layers.Conv2DTranspose(
          20, 5, use_bias=True, strides=2, padding="same",
          activation="leaky_relu", name="conv_1"),
      tf.keras.layers.Conv2DTranspose(
          1, 5, use_bias=True, strides=2, padding="same",
          activation="leaky_relu", name="conv_2"),
  ], name="synthesis_transform")

训练器拥有两个转换的实例,以及先验的参数。

它的 call 方法设置为计算如下参数:

  • 速率,估计表示该批次数字所需的位数,以及

  • 失真,原始数字的像素与其重建之间的平均绝对差。

class MNISTCompressionTrainer(tf.keras.Model):
  """Model that trains a compressor/decompressor for MNIST."""

  def __init__(self, latent_dims):
    super().__init__()
    self.analysis_transform = make_analysis_transform(latent_dims)
    self.synthesis_transform = make_synthesis_transform()
    self.prior_log_scales = tf.Variable(tf.zeros((latent_dims,)))

  @property
  def prior(self):
    return tfc.NoisyLogistic(loc=0., scale=tf.exp(self.prior_log_scales))

  def call(self, x, training):
    """Computes rate and distortion losses."""
    # Ensure inputs are floats in the range (0, 1).
    x = tf.cast(x, self.compute_dtype) / 255.
    x = tf.reshape(x, (-1, 28, 28, 1))

    # Compute latent space representation y, perturb it and model its entropy,
    # then compute the reconstructed pixel-level representation x_hat.
    y = self.analysis_transform(x)
    entropy_model = tfc.ContinuousBatchedEntropyModel(
        self.prior, coding_rank=1, compression=False)
    y_tilde, rate = entropy_model(y, training=training)
    x_tilde = self.synthesis_transform(y_tilde)

    # Average number of bits per MNIST digit.
    rate = tf.reduce_mean(rate)

    # Mean absolute difference across pixels.
    distortion = tf.reduce_mean(abs(x - x_tilde))

    return dict(rate=rate, distortion=distortion)

计算速率和失真#

我们使用训练集中的一张图像逐步完成此操作。加载 MNIST 数据集进行训练和验证:

training_dataset, validation_dataset = tfds.load(
    "mnist",
    split=["train", "test"],
    shuffle_files=True,
    as_supervised=True,
    with_info=False,
)

接着提取一张图像 \(x\)

(x, _), = validation_dataset.take(1)

plt.imshow(tf.squeeze(x))
print(f"Data type: {x.dtype}")
print(f"Shape: {x.shape}")

要获得隐空间表示 \(y\),我们需要将其转换为 float32,添加一个批次维度,并将其传递给分析转换。

x = tf.cast(x, tf.float32) / 255.
x = tf.reshape(x, (-1, 28, 28, 1))
y = make_analysis_transform(10)(x)

print("y:", y)

隐空间将在测试时被量化。为了在训练期间以可微的方式对此进行建模,我们在区间 \((-.5, .5)\) 中添加均匀噪声,并将结果称为 \(\tilde y\)。这与论文端到端优化图像压缩中使用的术语相同。

y_tilde = y + tf.random.uniform(y.shape, -.5, .5)

print("y_tilde:", y_tilde)

“先验”是一个概率密度,我们训练它来模拟噪声隐空间的边缘分布。例如,它可以是一组独立的逻辑分布,每个隐空间维度具有不同的尺度。tfc.NoisyLogistic 说明了隐空间具有加性噪声的事实。随着尺度接近零,逻辑分布接近狄拉克增量(尖峰),但添加的噪声导致“嘈杂”分布改为接近均匀分布。

prior = tfc.NoisyLogistic(loc=0., scale=tf.linspace(.01, 2., 10))

_ = tf.linspace(-6., 6., 501)[:, None]
plt.plot(_, prior.prob(_));

在训练期间,tfc.ContinuousBatchedEntropyModel 会添加均匀噪声,并使用噪声和先验来计算速率的(可微分)上限(编码隐空间表示所需的平均位数)。此界限可以作为损失最小化。

entropy_model = tfc.ContinuousBatchedEntropyModel(
    prior, coding_rank=1, compression=False)
y_tilde, rate = entropy_model(y, training=True)

print("rate:", rate)
print("y_tilde:", y_tilde)

最后,噪声隐空间通过合成转换向回传递以产生图像重建 \(\tilde x\)。失真是原始图像与重建之间的误差。显然,使用未训练的转换时,重建不太有用。

x_tilde = make_synthesis_transform()(y_tilde)

# Mean absolute difference across pixels.
distortion = tf.reduce_mean(abs(x - x_tilde))
print("distortion:", distortion)

x_tilde = tf.saturate_cast(x_tilde[0] * 255, tf.uint8)
plt.imshow(tf.squeeze(x_tilde))
print(f"Data type: {x_tilde.dtype}")
print(f"Shape: {x_tilde.shape}")

对于每个批次的数字,调用 MNISTCompressionTrainer 会产生该批次的平均速率和失真:

(example_batch, _), = validation_dataset.batch(32).take(1)
trainer = MNISTCompressionTrainer(10)
example_output = trainer(example_batch)

print("rate: ", example_output["rate"])
print("distortion: ", example_output["distortion"])

在下一部分中,我们建立模型来对这两个损失执行梯度下降。

训练模型#

我们以优化速率–失真拉格朗日的方式编译训练器,即速率和失真的总和,其中一项由拉格朗日参数 \(\lambda\) 加权。

此损失函数对模型的不同部分有着不同的影响:

  • 对分析转换进行训练以产生隐空间表示,该表示会在速率和失真之间实现所需的权衡。

  • 给定隐空间表示,训练合成转换以将失真最小化。

  • 训练先验参数以将给定隐空间表示的速率最小化。这与在最大似然意义上拟合隐空间的边缘分布的先验相同。

def pass_through_loss(_, x):
  # Since rate and distortion are unsupervised, the loss doesn't need a target.
  return x

def make_mnist_compression_trainer(lmbda, latent_dims=50):
  trainer = MNISTCompressionTrainer(latent_dims)
  trainer.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    # Just pass through rate and distortion as losses/metrics.
    loss=dict(rate=pass_through_loss, distortion=pass_through_loss),
    metrics=dict(rate=pass_through_loss, distortion=pass_through_loss),
    loss_weights=dict(rate=1., distortion=lmbda),
  )
  return trainer

接下来训练模型。此处不需要人工注释,因为我们只想压缩图像,所以我们使用 map 将它们丢弃,取而代之的是为速率和失真添加“虚拟”目标。

def add_rd_targets(image, label):
  # Training is unsupervised, so labels aren't necessary here. However, we
  # need to add "dummy" targets for rate and distortion.
  return image, dict(rate=0., distortion=0.)

def train_mnist_model(lmbda):
  trainer = make_mnist_compression_trainer(lmbda)
  trainer.fit(
      training_dataset.map(add_rd_targets).batch(128).prefetch(8),
      epochs=15,
      validation_data=validation_dataset.map(add_rd_targets).batch(128).cache(),
      validation_freq=1,
      verbose=1,
  )
  return trainer

trainer = train_mnist_model(lmbda=2000)

压缩一些 MNIST 图像#

对于测试时的压缩和解压缩,我们将训练好的模型分成两部分:

  • 编码器端由分析转换和熵模型组成。

  • 解码端由合成转换和相同的熵模型组成。

测试时,隐空间没有加性噪声,但它们会被量化并随后无损压缩,因此我们给它们提供新的名称。我们将它们和图像重建分别称为 \(\hat x\)\(\hat y\)(按照端到端优化图像压缩)。

class MNISTCompressor(tf.keras.Model):
  """Compresses MNIST images to strings."""

  def __init__(self, analysis_transform, entropy_model):
    super().__init__()
    self.analysis_transform = analysis_transform
    self.entropy_model = entropy_model

  def call(self, x):
    # Ensure inputs are floats in the range (0, 1).
    x = tf.cast(x, self.compute_dtype) / 255.
    y = self.analysis_transform(x)
    # Also return the exact information content of each digit.
    _, bits = self.entropy_model(y, training=False)
    return self.entropy_model.compress(y), bits
class MNISTDecompressor(tf.keras.Model):
  """Decompresses MNIST images from strings."""

  def __init__(self, entropy_model, synthesis_transform):
    super().__init__()
    self.entropy_model = entropy_model
    self.synthesis_transform = synthesis_transform

  def call(self, string):
    y_hat = self.entropy_model.decompress(string, ())
    x_hat = self.synthesis_transform(y_hat)
    # Scale and cast back to 8-bit integer.
    return tf.saturate_cast(tf.round(x_hat * 255.), tf.uint8)

当使用 compression=True 实例化时,熵模型将学习的先验转换为范围编码算法的表。调用 compress() 时,会调用此算法以将隐空间向量转换为位序列。每个二进制字符串的长度近似于隐空间的信息内容(先验下隐空间的负对数似然值)。

压缩和解压缩的熵模型必须是相同的实例,因为范围编码表需要在两端完全相同。否则,可能会出现解码错误。

def make_mnist_codec(trainer, **kwargs):
  # The entropy model must be created with `compression=True` and the same
  # instance must be shared between compressor and decompressor.
  entropy_model = tfc.ContinuousBatchedEntropyModel(
      trainer.prior, coding_rank=1, compression=True, **kwargs)
  compressor = MNISTCompressor(trainer.analysis_transform, entropy_model)
  decompressor = MNISTDecompressor(entropy_model, trainer.synthesis_transform)
  return compressor, decompressor

compressor, decompressor = make_mnist_codec(trainer)

从验证数据集中抓取 16 个图像。您可以通过将参数更改为 skip 来选择不同的子集。

(originals, _), = validation_dataset.batch(16).skip(3).take(1)

将它们压缩为字符串,并以位为单位跟踪它们的每个信息内容。

strings, entropies = compressor(originals)

print(f"String representation of first digit in hexadecimal: 0x{strings[0].numpy().hex()}")
print(f"Number of bits actually needed to represent it: {entropies[0]:0.2f}")

从字符串中将图像解压缩回来。

reconstructions = decompressor(strings)

显示 16 个原始数字中的每一个及其压缩二进制表示,以及重建的数字。

#@title

def display_digits(originals, strings, entropies, reconstructions):
  """Visualizes 16 digits together with their reconstructions."""
  fig, axes = plt.subplots(4, 4, sharex=True, sharey=True, figsize=(12.5, 5))
  axes = axes.ravel()
  for i in range(len(axes)):
    image = tf.concat([
        tf.squeeze(originals[i]),
        tf.zeros((28, 14), tf.uint8),
        tf.squeeze(reconstructions[i]),
    ], 1)
    axes[i].imshow(image)
    axes[i].text(
        .5, .5, f"→ 0x{strings[i].numpy().hex()} →\n{entropies[i]:0.2f} bits",
        ha="center", va="top", color="white", fontsize="small",
        transform=axes[i].transAxes)
    axes[i].axis("off")
  plt.subplots_adjust(wspace=0, hspace=0, left=0, right=1, bottom=0, top=1)
display_digits(originals, strings, entropies, reconstructions)

请注意,编码字符串的长度与每个数字的信息内容不同。

这是因为范围编码流程使用离散概率,并且具有少量开销。因此,特别是对于短字符串,这种对应关系只是近似的。不过,范围编码是渐近最优的:在极限情况下,期望的比特数将接近交叉熵(期望的信息内容),训练模型中的速率项是一个上限。

速率–失真权衡#

在上面,该模型经过训练以在用于表示每个数字的平均位数与重建中产生的错误之间进行特定权衡(由 lmbda=2000 给出)。

当我们用不同的值重复实验时,会发生什么?

我们首先将 \(\lambda\) 减少到 500。

def train_and_visualize_model(lmbda):
  trainer = train_mnist_model(lmbda=lmbda)
  compressor, decompressor = make_mnist_codec(trainer)
  strings, entropies = compressor(originals)
  reconstructions = decompressor(strings)
  display_digits(originals, strings, entropies, reconstructions)

train_and_visualize_model(lmbda=500)

代码的比特率下降了,数字的保真度也随之降低。但是,大多数数字仍然可以识别。

我们进一步减少 \(\lambda\)

train_and_visualize_model(lmbda=300)

字符串现在开始变得更短,大约每个数字一个字节。然而,这是有代价的。越来越多的数字变得无法辨认。

这表明此模型与人类对错误的感知无关,它只是根据像素值测量绝对偏差。为了获得更好的感知图像质量,我们需要用感知损失来代替像素损失。

使用解码器作为生成模型#

如果我们向解码器提供随机位,这将有效地从模型学习表示数字的分布中采样。

首先,重新实例化压缩器/解压缩器而不进行完整性检查,该检查将检测输入字符串是否未完全解码。

compressor, decompressor = make_mnist_codec(trainer, decode_sanity_check=False)

现在,将足够长的随机字符串输入解压缩器,以便它可以从中解码/采样数字。

import os

strings = tf.constant([os.urandom(8) for _ in range(16)])
samples = decompressor(strings)

fig, axes = plt.subplots(4, 4, sharex=True, sharey=True, figsize=(5, 5))
axes = axes.ravel()
for i in range(len(axes)):
  axes[i].imshow(tf.squeeze(samples[i]))
  axes[i].axis("off")
plt.subplots_adjust(wspace=0, hspace=0, left=0, right=1, bottom=0, top=1)