##### Copyright 2022 The TensorFlow Compression Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

可扩展的模型压缩#

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 Github 上查看源代码 下载笔记本

概述#

本笔记本展示了如何使用 TensorFlow Compression 压缩模型。

在下面的示例中,我们将 MNIST 分类器的权重压缩到比其浮点表示小得多的大小,同时保持分类准确率。这是通过基于论文 Scalable Model Compression by Entropy Penalized Reparameterization 的两步过程完成的:

  • 在训练期间使用显式熵惩罚来训练“可压缩”模型,这鼓励了模型参数的可压缩性。此惩罚的权重 \(\lambda\),能够持续控制压缩模型大小和其准确率之间的权衡。

  • 使用与惩罚相匹配的编码方案将可压缩模型编码为压缩模型,这意味着惩罚是对模型大小的良好预测指标。这确保了该方法不需要多次迭代训练、压缩和重新训练模型以进行微调。

这种方法会严格考虑压缩模型的大小,而不是计算复杂度。它可以与模型剪枝等技术相结合,以减少大小和复杂度。

各种模型的压缩结果示例:

模型(数据集)

模型大小

压缩率

Top-1 错误压缩(解压缩)

LeNet300-100 (MNIST)

8.56 KB

124x

1.9% (1.6%)

LeNet5-Caffe (MNIST)

2.84 KB

606x

1.0% (0.7%)

VGG-16 (CIFAR-10)

101 KB

590x

10.0% (6.6%)

ResNet-20-4 (CIFAR-10)

128 KB

134x

8.8% (5.0%)

ResNet-18 (ImageNet)

1.97 MB

24x

30.0% (30.0%)

ResNet-50 (ImageNet)

5.49 MB

19x

26.0% (25.0%)

应用包括:

  • 大规模部署/广播模型到边缘设备,节省传输带宽。

  • 在联合学习中向客户端传达全局模型状态。模型架构(隐藏单元的数量等)相较于初始模型没有变化,客户端可以在解压缩的模型上继续学习。

  • 在内存极其有限的客户端上执行推断。在推断过程中,可以按顺序解压缩每一层的权重,并在计算激活后立即丢弃。

设置#

通过 pip 安装 TensorFlow Compression。

%%bash
# Installs the latest version of TFC compatible with the installed TF version.

read MAJOR MINOR <<< "$(pip show tensorflow | perl -p -0777 -e 's/.*Version: (\d+)\.(\d+).*/\1 \2/sg')"
pip install "tensorflow-compression<$MAJOR.$(($MINOR+1))"

导入库依赖项。

import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_compression as tfc
import tensorflow_datasets as tfds

定义和训练一个基本的 MNIST 分类器#

为了高效压缩密集层和卷积层,我们需要定义自定义层类。这些类似于 tf.keras.layers 下的层,但我们稍后将对它们进行子类化以高效实现熵惩罚重参数化 (EPR)。为此,我们还添加了一个复制构造函数。

首先,我们定义一个标准的密集层:

class CustomDense(tf.keras.layers.Layer):

  def __init__(self, filters, name="dense"):
    super().__init__(name=name)
    self.filters = filters

  @classmethod
  def copy(cls, other, **kwargs):
    """Returns an instantiated and built layer, initialized from `other`."""
    self = cls(filters=other.filters, name=other.name, **kwargs)
    self.build(None, other=other)
    return self

  def build(self, input_shape, other=None):
    """Instantiates weights, optionally initializing them from `other`."""
    if other is None:
      kernel_shape = (input_shape[-1], self.filters)
      kernel = tf.keras.initializers.GlorotUniform()(shape=kernel_shape)
      bias = tf.keras.initializers.Zeros()(shape=(self.filters,))
    else:
      kernel, bias = other.kernel, other.bias
    self.kernel = tf.Variable(
        tf.cast(kernel, self.variable_dtype), name="kernel")
    self.bias = tf.Variable(
        tf.cast(bias, self.variable_dtype), name="bias")
    self.built = True

  def call(self, inputs):
    outputs = tf.linalg.matvec(self.kernel, inputs, transpose_a=True)
    outputs = tf.nn.bias_add(outputs, self.bias)
    return tf.nn.leaky_relu(outputs)

类似地,定义一个 2D 卷积层:

class CustomConv2D(tf.keras.layers.Layer):

  def __init__(self, filters, kernel_size,
               strides=1, padding="SAME", name="conv2d"):
    super().__init__(name=name)
    self.filters = filters
    self.kernel_size = kernel_size
    self.strides = strides
    self.padding = padding

  @classmethod
  def copy(cls, other, **kwargs):
    """Returns an instantiated and built layer, initialized from `other`."""
    self = cls(filters=other.filters, kernel_size=other.kernel_size,
               strides=other.strides, padding=other.padding, name=other.name,
               **kwargs)
    self.build(None, other=other)
    return self

  def build(self, input_shape, other=None):
    """Instantiates weights, optionally initializing them from `other`."""
    if other is None:
      kernel_shape = 2 * (self.kernel_size,) + (input_shape[-1], self.filters)
      kernel = tf.keras.initializers.GlorotUniform()(shape=kernel_shape)
      bias = tf.keras.initializers.Zeros()(shape=(self.filters,))
    else:
      kernel, bias = other.kernel, other.bias
    self.kernel = tf.Variable(
        tf.cast(kernel, self.variable_dtype), name="kernel")
    self.bias = tf.Variable(
        tf.cast(bias, self.variable_dtype), name="bias")
    self.built = True

  def call(self, inputs):
    outputs = tf.nn.convolution(
        inputs, self.kernel, strides=self.strides, padding=self.padding)
    outputs = tf.nn.bias_add(outputs, self.bias)
    return tf.nn.leaky_relu(outputs)

在继续模型压缩之前,我们来检查一下是否可以成功地训练一个常规分类器。

定义模型架构:

classifier = tf.keras.Sequential([
    CustomConv2D(20, 5, strides=2, name="conv_1"),
    CustomConv2D(50, 5, strides=2, name="conv_2"),
    tf.keras.layers.Flatten(),
    CustomDense(500, name="fc_1"),
    CustomDense(10, name="fc_2"),
], name="classifier")

加载训练数据:

def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label

training_dataset, validation_dataset = tfds.load(
    "mnist",
    split=["train", "test"],
    shuffle_files=True,
    as_supervised=True,
    with_info=False,
)
training_dataset = training_dataset.map(normalize_img)
validation_dataset = validation_dataset.map(normalize_img)

最后,训练模型:

def train_model(model, training_data, validation_data, **kwargs):
  model.compile(
      optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
      # Uncomment this to ease debugging:
      # run_eagerly=True,
  )
  kwargs.setdefault("epochs", 5)
  kwargs.setdefault("verbose", 1)
  log = model.fit(
      training_data.batch(128).prefetch(8),
      validation_data=validation_data.batch(128).cache(),
      validation_freq=1,
      **kwargs,
  )
  return log.history["val_sparse_categorical_accuracy"][-1]

classifier_accuracy = train_model(
    classifier, training_dataset, validation_dataset)

print(f"Accuracy: {classifier_accuracy:0.4f}")

成功!该模型训练良好,在 5 个周期内的验证集上的准确率达到了 98% 以上。

训练可压缩分类器#

熵惩罚重参数化(EPR)有两个主要组成部分:

  • 在训练期间对模型权重施加惩罚,该惩罚对应于概率模型下的熵,并与权重的编码方案相匹配。下面,我们定义一个实现此惩罚的 Keras Regularizer

  • 重新参数化权重,即将它们带入更具可压缩性的潜在表示中(在可压缩性和模型性能之间达成更好的权衡)。对于卷积核,已经证明傅里叶域是一个很好的表示。对于其他参数,以下示例仅使用具有不同量化步长的标量量化(舍入)。

首先,定义惩罚。

下面的示例使用在 tfc.PowerLawEntropyModel 类中实现的代码/概率模型,灵感来自论文 Optimizing the Communication-Accuracy Trade-off in Federated Learning with Rate-Distortion Theory。惩罚定义为:$\( \log \Bigl(\frac {|x| + \alpha} \alpha\Bigr),\)\( 其中 \)x\( 是模型参数或其潜在表示的一个元素,\)\alpha$ 是一个数值稳定性在 0 附近小常量。

_ = tf.linspace(-5., 5., 501)
plt.plot(_, tfc.PowerLawEntropyModel(0).penalty(_));

这种惩罚实际上是一种正则化损失(有时称为“权重损失”)。它是凹形的,顶点为零,这一事实鼓励权重稀疏。用于压缩权重的编码方案是 Elias gamma 码,它为元素大小产生长度为 \( 1 + \lfloor \log_2 |x| \rfloor \) 比特的编码。也就是说,它与惩罚相匹配,并应用惩罚从而最小化预期的代码长度。

class PowerLawRegularizer(tf.keras.regularizers.Regularizer):

  def __init__(self, lmbda):
    super().__init__()
    self.lmbda = lmbda

  def __call__(self, variable):
    em = tfc.PowerLawEntropyModel(coding_rank=variable.shape.rank)
    return self.lmbda * em.penalty(variable)

# Normalizing the weight of the penalty by the number of model parameters is a
# good rule of thumb to produce comparable results across models.
regularizer = PowerLawRegularizer(lmbda=2./classifier.count_params())

其次,定义 CustomDenseCustomConv2D 的子类,它们具有以下附加功能:

  • 它们接受上述 Regularizer 的一个实例,并将其应用于训练期间的内核和偏差。

  • 它们将内核和偏差定义为 @property,每当访问变量时,它们都会使用直通梯度执行量化。这准确地反映了稍后在压缩模型中执行的计算。

  • 它们定义了额外的 log_step 变量,代表量化步长的对数。量化越粗,模型越小,但准确率越低。每个模型参数的量化步长都可训练,因此对惩罚损失函数执行优化将确定最佳量化步长。

量化步长定义如下:

def quantize(latent, log_step):
  step = tf.exp(log_step)
  return tfc.round_st(latent / step) * step

有了它,我们可以定义密集层:

class CompressibleDense(CustomDense):

  def __init__(self, regularizer, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.regularizer = regularizer

  def build(self, input_shape, other=None):
    """Instantiates weights, optionally initializing them from `other`."""
    super().build(input_shape, other=other)
    if other is not None and hasattr(other, "kernel_log_step"):
      kernel_log_step = other.kernel_log_step
      bias_log_step = other.bias_log_step
    else:
      kernel_log_step = bias_log_step = -4.
    self.kernel_log_step = tf.Variable(
        tf.cast(kernel_log_step, self.variable_dtype), name="kernel_log_step")
    self.bias_log_step = tf.Variable(
        tf.cast(bias_log_step, self.variable_dtype), name="bias_log_step")
    self.add_loss(lambda: self.regularizer(
        self.kernel_latent / tf.exp(self.kernel_log_step)))
    self.add_loss(lambda: self.regularizer(
        self.bias_latent / tf.exp(self.bias_log_step)))

  @property
  def kernel(self):
    return quantize(self.kernel_latent, self.kernel_log_step)

  @kernel.setter
  def kernel(self, kernel):
    self.kernel_latent = tf.Variable(kernel, name="kernel_latent")

  @property
  def bias(self):
    return quantize(self.bias_latent, self.bias_log_step)

  @bias.setter
  def bias(self, bias):
    self.bias_latent = tf.Variable(bias, name="bias_latent")

卷积层类似。此外,只要设置了卷积核,就会将卷积核作为其实值离散傅里叶变换 (RDFT) 存储,并且每当使用该核时,变换都会被反转。由于内核的不同频率分量往往或多或少是可压缩的,因此其中的每个分量都被分配了自己的量化步长。

按如下方式定义傅里叶变换及其逆变换:

def to_rdft(kernel, kernel_size):
  # The kernel has shape (H, W, I, O) -> transpose to take DFT over last two
  # dimensions.
  kernel = tf.transpose(kernel, (2, 3, 0, 1))
  # The RDFT has type complex64 and shape (I, O, FH, FW).
  kernel_rdft = tf.signal.rfft2d(kernel)
  # Map real and imaginary parts into regular floats. The result is float32
  # and has shape (I, O, FH, FW, 2).
  kernel_rdft = tf.stack(
      [tf.math.real(kernel_rdft), tf.math.imag(kernel_rdft)], axis=-1)
  # Divide by kernel size to make the DFT orthonormal (length-preserving).
  return kernel_rdft / kernel_size

def from_rdft(kernel_rdft, kernel_size):
  # Undoes the transformations in to_rdft.
  kernel_rdft *= kernel_size
  kernel_rdft = tf.dtypes.complex(*tf.unstack(kernel_rdft, axis=-1))
  kernel = tf.signal.irfft2d(kernel_rdft, fft_length=2 * (kernel_size,))
  return tf.transpose(kernel, (2, 3, 0, 1))

这样,将卷积层定义为:

class CompressibleConv2D(CustomConv2D):

  def __init__(self, regularizer, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.regularizer = regularizer

  def build(self, input_shape, other=None):
    """Instantiates weights, optionally initializing them from `other`."""
    super().build(input_shape, other=other)
    if other is not None and hasattr(other, "kernel_log_step"):
      kernel_log_step = other.kernel_log_step
      bias_log_step = other.bias_log_step
    else:
      kernel_log_step = tf.fill(self.kernel_latent.shape[2:], -4.)
      bias_log_step = -4.
    self.kernel_log_step = tf.Variable(
        tf.cast(kernel_log_step, self.variable_dtype), name="kernel_log_step")
    self.bias_log_step = tf.Variable(
        tf.cast(bias_log_step, self.variable_dtype), name="bias_log_step")
    self.add_loss(lambda: self.regularizer(
        self.kernel_latent / tf.exp(self.kernel_log_step)))
    self.add_loss(lambda: self.regularizer(
        self.bias_latent / tf.exp(self.bias_log_step)))

  @property
  def kernel(self):
    kernel_rdft = quantize(self.kernel_latent, self.kernel_log_step)
    return from_rdft(kernel_rdft, self.kernel_size)

  @kernel.setter
  def kernel(self, kernel):
    kernel_rdft = to_rdft(kernel, self.kernel_size)
    self.kernel_latent = tf.Variable(kernel_rdft, name="kernel_latent")

  @property
  def bias(self):
    return quantize(self.bias_latent, self.bias_log_step)

  @bias.setter
  def bias(self, bias):
    self.bias_latent = tf.Variable(bias, name="bias_latent")

使用与上面相同的架构定义分类器模型,但使用以下修改后的层:

def make_mnist_classifier(regularizer):
  return tf.keras.Sequential([
      CompressibleConv2D(regularizer, 20, 5, strides=2, name="conv_1"),
      CompressibleConv2D(regularizer, 50, 5, strides=2, name="conv_2"),
      tf.keras.layers.Flatten(),
      CompressibleDense(regularizer, 500, name="fc_1"),
      CompressibleDense(regularizer, 10, name="fc_2"),
  ], name="classifier")

compressible_classifier = make_mnist_classifier(regularizer)

并训练模型:

penalized_accuracy = train_model(
    compressible_classifier, training_dataset, validation_dataset)

print(f"Accuracy: {penalized_accuracy:0.4f}")

可压缩模型已达到与普通分类器相似的准确率。

但是,该模型实际上还没有被压缩。为此,我们定义了另一组子类,它们以压缩形式存储内核和偏差(作为位序列)。

压缩分类器#

下面定义的 CustomDenseCustomConv2D 的子类将可压缩密集层的权重转换为二进制字符串。此外,它们以半精度存储量化步长的对数以节省空间。每当通过 @property 访问内核或偏差时,它们就会从其字符串表示中解压缩并去量化。

首先,定义函数来压缩和解压缩模型参数:

def compress_latent(latent, log_step, name):
  em = tfc.PowerLawEntropyModel(latent.shape.rank)
  compressed = em.compress(latent / tf.exp(log_step))
  compressed = tf.Variable(compressed, name=f"{name}_compressed")
  log_step = tf.cast(log_step, tf.float16)
  log_step = tf.Variable(log_step, name=f"{name}_log_step")
  return compressed, log_step

def decompress_latent(compressed, shape, log_step):
  latent = tfc.PowerLawEntropyModel(len(shape)).decompress(compressed, shape)
  step = tf.exp(tf.cast(log_step, latent.dtype))
  return latent * step

有了这些,我们可以定义 CompressedDense

class CompressedDense(CustomDense):

  def build(self, input_shape, other=None):
    assert isinstance(other, CompressibleDense)
    self.input_channels = other.kernel.shape[0]
    self.kernel_compressed, self.kernel_log_step = compress_latent(
        other.kernel_latent, other.kernel_log_step, "kernel")
    self.bias_compressed, self.bias_log_step = compress_latent(
        other.bias_latent, other.bias_log_step, "bias")
    self.built = True

  @property
  def kernel(self):
    kernel_shape = (self.input_channels, self.filters)
    return decompress_latent(
        self.kernel_compressed, kernel_shape, self.kernel_log_step)

  @property
  def bias(self):
    bias_shape = (self.filters,)
    return decompress_latent(
        self.bias_compressed, bias_shape, self.bias_log_step)

卷积层类与上面类似。

class CompressedConv2D(CustomConv2D):

  def build(self, input_shape, other=None):
    assert isinstance(other, CompressibleConv2D)
    self.input_channels = other.kernel.shape[2]
    self.kernel_compressed, self.kernel_log_step = compress_latent(
        other.kernel_latent, other.kernel_log_step, "kernel")
    self.bias_compressed, self.bias_log_step = compress_latent(
        other.bias_latent, other.bias_log_step, "bias")
    self.built = True

  @property
  def kernel(self):
    rdft_shape = (self.input_channels, self.filters,
                  self.kernel_size, self.kernel_size // 2 + 1, 2)
    kernel_rdft = decompress_latent(
        self.kernel_compressed, rdft_shape, self.kernel_log_step)
    return from_rdft(kernel_rdft, self.kernel_size)

  @property
  def bias(self):
    bias_shape = (self.filters,)
    return decompress_latent(
        self.bias_compressed, bias_shape, self.bias_log_step)

要将可压缩模型转换为压缩模型,我们可以方便地使用 clone_model 函数。compress_layer 可以将任何可压缩层转换为压缩层,并简单地传递给任何其他类型的层(例如 Flatten 等)。

def compress_layer(layer):
  if isinstance(layer, CompressibleDense):
    return CompressedDense.copy(layer)
  if isinstance(layer, CompressibleConv2D):
    return CompressedConv2D.copy(layer)
  return type(layer).from_config(layer.get_config())

compressed_classifier = tf.keras.models.clone_model(
    compressible_classifier, clone_function=compress_layer)

现在,我们来验证压缩模型是否仍按预期执行:

compressed_classifier.compile(metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
_, compressed_accuracy = compressed_classifier.evaluate(validation_dataset.batch(128))

print(f"Accuracy of the compressible classifier: {penalized_accuracy:0.4f}")
print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}")

压缩模型的分类准确率与训练期间达到的分类准确率相同!

此外,压缩后的模型权重的大小远小于原始模型的大小:

def get_weight_size_in_bytes(weight):
  if weight.dtype == tf.string:
    return tf.reduce_sum(tf.strings.length(weight, unit="BYTE"))
  else:
    return tf.size(weight) * weight.dtype.size

original_size = sum(map(get_weight_size_in_bytes, classifier.weights))
compressed_size = sum(map(get_weight_size_in_bytes, compressed_classifier.weights))

print(f"Size of original model weights: {original_size} bytes")
print(f"Size of compressed model weights: {compressed_size} bytes")
print(f"Compression ratio: {(original_size/compressed_size):0.0f}x")

将模型存储在磁盘上需要一些开销来存储模型架构、函数图等。

ZIP 等无损压缩方法擅长压缩此类数据,但不擅长压缩权重本身。这就是为什么在应用了 ZIP 压缩之后,当计算模型大小(包括开销)时,EPR 仍然具有显著优势:

import os
import shutil

def get_disk_size(model, path):
  model.save(path)
  zip_path = shutil.make_archive(path, "zip", path)
  return os.path.getsize(zip_path)

original_zip_size = get_disk_size(classifier, "/tmp/classifier")
compressed_zip_size = get_disk_size(
    compressed_classifier, "/tmp/compressed_classifier")

print(f"Original on-disk size (ZIP compressed): {original_zip_size} bytes")
print(f"Compressed on-disk size (ZIP compressed): {compressed_zip_size} bytes")
print(f"Compression ratio: {(original_zip_size/compressed_zip_size):0.0f}x")

正则化效果和大小-准确度权衡#

上面,\(\lambda\) 超参数被设置为 2(通过模型中的参数数量进行标准化)。随着我们增加 \(\lambda\),模型权重的可压缩性受到越来越严重的惩罚。

对于较低的值,惩罚可以起到权重调节器的作用。它实际上对分类器的泛化性能有有益的影响,并且可以在验证数据集上产生略高的准确率:

#@title

print(f"Accuracy of the vanilla classifier: {classifier_accuracy:0.4f}")
print(f"Accuracy of the penalized classifier: {penalized_accuracy:0.4f}")

对于更高的值,我们看到模型大小越来越小,但准确率也在逐渐降低。为了看到这一点,我们来训练几个模型,并绘制它们的大小与准确率之间的关系图:

def compress_and_evaluate_model(lmbda):
  print(f"lambda={lmbda:0.0f}: training...", flush=True)
  regularizer = PowerLawRegularizer(lmbda=lmbda/classifier.count_params())
  compressible_classifier = make_mnist_classifier(regularizer)
  train_model(
      compressible_classifier, training_dataset, validation_dataset, verbose=0)
  print("compressing...", flush=True)
  compressed_classifier = tf.keras.models.clone_model(
      compressible_classifier, clone_function=compress_layer)
  compressed_size = sum(map(
      get_weight_size_in_bytes, compressed_classifier.weights))
  compressed_zip_size = float(get_disk_size(
      compressed_classifier, "/tmp/compressed_classifier"))
  print("evaluating...", flush=True)
  compressed_classifier = tf.keras.models.load_model(
      "/tmp/compressed_classifier")
  compressed_classifier.compile(
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
  _, compressed_accuracy = compressed_classifier.evaluate(
      validation_dataset.batch(128), verbose=0)
  print()
  return compressed_size, compressed_zip_size, compressed_accuracy

lambdas = (2., 5., 10., 20., 50.)
metrics = [compress_and_evaluate_model(l) for l in lambdas]
metrics = tf.convert_to_tensor(metrics, tf.float32)
#@title

def plot_broken_xaxis(ax, compressed_sizes, original_size, original_accuracy):
  xticks = list(range(
      int(tf.math.floor(min(compressed_sizes) / 5) * 5),
      int(tf.math.ceil(max(compressed_sizes) / 5) * 5) + 1,
      5))
  xticks.append(xticks[-1] + 10)
  ax.set_xlim(xticks[0], xticks[-1] + 2)
  ax.set_xticks(xticks[1:])
  ax.set_xticklabels(xticks[1:-1] + [f"{original_size:0.2f}"])
  ax.plot(xticks[-1], original_accuracy, "o", label="float32")

sizes, zip_sizes, accuracies = tf.transpose(metrics)
sizes /= 1024
zip_sizes /= 1024

fig, (axl, axr) = plt.subplots(1, 2, sharey=True, figsize=(10, 4))
axl.plot(sizes, accuracies, "o-", label="EPR compressed")
axr.plot(zip_sizes, accuracies, "o-", label="EPR compressed")
plot_broken_xaxis(axl, sizes, original_size/1024, classifier_accuracy)
plot_broken_xaxis(axr, zip_sizes, original_zip_size/1024, classifier_accuracy)

axl.set_xlabel("size of model weights [kbytes]")
axr.set_xlabel("ZIP compressed on-disk model size [kbytes]")
axl.set_ylabel("accuracy")
axl.legend(loc="lower right")
axr.legend(loc="lower right")
axl.grid()
axr.grid()
for i in range(len(lambdas)):
  axl.annotate(f"$\lambda = {lambdas[i]:0.0f}$", (sizes[i], accuracies[i]),
               xytext=(10, -5), xycoords="data", textcoords="offset points")
  axr.annotate(f"$\lambda = {lambdas[i]:0.0f}$", (zip_sizes[i], accuracies[i]),
               xytext=(10, -5), xycoords="data", textcoords="offset points")
plt.tight_layout()

理想情况下,该图应显示肘形大小-准确率权衡,但准确率指标有些噪声也正常。根据初始化的不同,曲线可能会出现一些曲折。

由于正则化效应,对于较小的 \(\lambda\) 值,EPR 压缩模型在测试集上比原始模型更准确。即使我们比较附加 ZIP 压缩后的大小,EPR 压缩模型也要小很多倍。

解压缩分类器#

CompressedDenseCompressedConv2D 在每次前向传递时会解压缩它们的权重。这使得它们非常适合内存有限的设备,但解压缩的计算成本可能很高,尤其是对于小批次。

要将模型解压缩一次,并将其用于进一步的训练或推断,我们可以使用常规层或可压缩层将其转换回模型。这在模型部署或联合学习场景中很有用。

首先,转换回普通模型,我们可以进行推断,和/或继续进行常规训练,而不会有压缩惩罚:

def decompress_layer(layer):
  if isinstance(layer, CompressedDense):
    return CustomDense.copy(layer)
  if isinstance(layer, CompressedConv2D):
    return CustomConv2D.copy(layer)
  return type(layer).from_config(layer.get_config())

decompressed_classifier = tf.keras.models.clone_model(
    compressed_classifier, clone_function=decompress_layer)
decompressed_accuracy = train_model(
    decompressed_classifier, training_dataset, validation_dataset, epochs=1)

print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}")
print(f"Accuracy of the decompressed classifier after one more epoch of training: {decompressed_accuracy:0.4f}")

请注意,在训练额外的周期后验证准确率会下降,因为训练是在没有正则化的情况下完成的。

或者,我们可以将模型转换回“可压缩”模型,以进行推断和/或进一步训练,并带有压缩惩罚:

def decompress_layer_with_penalty(layer):
  if isinstance(layer, CompressedDense):
    return CompressibleDense.copy(layer, regularizer=regularizer)
  if isinstance(layer, CompressedConv2D):
    return CompressibleConv2D.copy(layer, regularizer=regularizer)
  return type(layer).from_config(layer.get_config())

decompressed_classifier = tf.keras.models.clone_model(
    compressed_classifier, clone_function=decompress_layer_with_penalty)
decompressed_accuracy = train_model(
    decompressed_classifier, training_dataset, validation_dataset, epochs=1)

print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}")
print(f"Accuracy of the decompressed classifier after one more epoch of training: {decompressed_accuracy:0.4f}")

在这里,在训练一个额外的周期后,准确率会提高。