##### Copyright 2022 The TensorFlow Compression Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
可扩展的模型压缩#
在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 Github 上查看源代码 | 下载笔记本 |
概述#
本笔记本展示了如何使用 TensorFlow Compression 压缩模型。
在下面的示例中,我们将 MNIST 分类器的权重压缩到比其浮点表示小得多的大小,同时保持分类准确率。这是通过基于论文 Scalable Model Compression by Entropy Penalized Reparameterization 的两步过程完成的:
在训练期间使用显式熵惩罚来训练“可压缩”模型,这鼓励了模型参数的可压缩性。此惩罚的权重 \(\lambda\),能够持续控制压缩模型大小和其准确率之间的权衡。
使用与惩罚相匹配的编码方案将可压缩模型编码为压缩模型,这意味着惩罚是对模型大小的良好预测指标。这确保了该方法不需要多次迭代训练、压缩和重新训练模型以进行微调。
这种方法会严格考虑压缩模型的大小,而不是计算复杂度。它可以与模型剪枝等技术相结合,以减少大小和复杂度。
各种模型的压缩结果示例:
模型(数据集) |
模型大小 |
压缩率 |
Top-1 错误压缩(解压缩) |
---|---|---|---|
LeNet300-100 (MNIST) |
8.56 KB |
124x |
1.9% (1.6%) |
LeNet5-Caffe (MNIST) |
2.84 KB |
606x |
1.0% (0.7%) |
VGG-16 (CIFAR-10) |
101 KB |
590x |
10.0% (6.6%) |
ResNet-20-4 (CIFAR-10) |
128 KB |
134x |
8.8% (5.0%) |
ResNet-18 (ImageNet) |
1.97 MB |
24x |
30.0% (30.0%) |
ResNet-50 (ImageNet) |
5.49 MB |
19x |
26.0% (25.0%) |
应用包括:
大规模部署/广播模型到边缘设备,节省传输带宽。
在联合学习中向客户端传达全局模型状态。模型架构(隐藏单元的数量等)相较于初始模型没有变化,客户端可以在解压缩的模型上继续学习。
在内存极其有限的客户端上执行推断。在推断过程中,可以按顺序解压缩每一层的权重,并在计算激活后立即丢弃。
设置#
通过 pip
安装 TensorFlow Compression。
%%bash
# Installs the latest version of TFC compatible with the installed TF version.
read MAJOR MINOR <<< "$(pip show tensorflow | perl -p -0777 -e 's/.*Version: (\d+)\.(\d+).*/\1 \2/sg')"
pip install "tensorflow-compression<$MAJOR.$(($MINOR+1))"
导入库依赖项。
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_compression as tfc
import tensorflow_datasets as tfds
定义和训练一个基本的 MNIST 分类器#
为了高效压缩密集层和卷积层,我们需要定义自定义层类。这些类似于 tf.keras.layers
下的层,但我们稍后将对它们进行子类化以高效实现熵惩罚重参数化 (EPR)。为此,我们还添加了一个复制构造函数。
首先,我们定义一个标准的密集层:
class CustomDense(tf.keras.layers.Layer):
def __init__(self, filters, name="dense"):
super().__init__(name=name)
self.filters = filters
@classmethod
def copy(cls, other, **kwargs):
"""Returns an instantiated and built layer, initialized from `other`."""
self = cls(filters=other.filters, name=other.name, **kwargs)
self.build(None, other=other)
return self
def build(self, input_shape, other=None):
"""Instantiates weights, optionally initializing them from `other`."""
if other is None:
kernel_shape = (input_shape[-1], self.filters)
kernel = tf.keras.initializers.GlorotUniform()(shape=kernel_shape)
bias = tf.keras.initializers.Zeros()(shape=(self.filters,))
else:
kernel, bias = other.kernel, other.bias
self.kernel = tf.Variable(
tf.cast(kernel, self.variable_dtype), name="kernel")
self.bias = tf.Variable(
tf.cast(bias, self.variable_dtype), name="bias")
self.built = True
def call(self, inputs):
outputs = tf.linalg.matvec(self.kernel, inputs, transpose_a=True)
outputs = tf.nn.bias_add(outputs, self.bias)
return tf.nn.leaky_relu(outputs)
类似地,定义一个 2D 卷积层:
class CustomConv2D(tf.keras.layers.Layer):
def __init__(self, filters, kernel_size,
strides=1, padding="SAME", name="conv2d"):
super().__init__(name=name)
self.filters = filters
self.kernel_size = kernel_size
self.strides = strides
self.padding = padding
@classmethod
def copy(cls, other, **kwargs):
"""Returns an instantiated and built layer, initialized from `other`."""
self = cls(filters=other.filters, kernel_size=other.kernel_size,
strides=other.strides, padding=other.padding, name=other.name,
**kwargs)
self.build(None, other=other)
return self
def build(self, input_shape, other=None):
"""Instantiates weights, optionally initializing them from `other`."""
if other is None:
kernel_shape = 2 * (self.kernel_size,) + (input_shape[-1], self.filters)
kernel = tf.keras.initializers.GlorotUniform()(shape=kernel_shape)
bias = tf.keras.initializers.Zeros()(shape=(self.filters,))
else:
kernel, bias = other.kernel, other.bias
self.kernel = tf.Variable(
tf.cast(kernel, self.variable_dtype), name="kernel")
self.bias = tf.Variable(
tf.cast(bias, self.variable_dtype), name="bias")
self.built = True
def call(self, inputs):
outputs = tf.nn.convolution(
inputs, self.kernel, strides=self.strides, padding=self.padding)
outputs = tf.nn.bias_add(outputs, self.bias)
return tf.nn.leaky_relu(outputs)
在继续模型压缩之前,我们来检查一下是否可以成功地训练一个常规分类器。
定义模型架构:
classifier = tf.keras.Sequential([
CustomConv2D(20, 5, strides=2, name="conv_1"),
CustomConv2D(50, 5, strides=2, name="conv_2"),
tf.keras.layers.Flatten(),
CustomDense(500, name="fc_1"),
CustomDense(10, name="fc_2"),
], name="classifier")
加载训练数据:
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
training_dataset, validation_dataset = tfds.load(
"mnist",
split=["train", "test"],
shuffle_files=True,
as_supervised=True,
with_info=False,
)
training_dataset = training_dataset.map(normalize_img)
validation_dataset = validation_dataset.map(normalize_img)
最后,训练模型:
def train_model(model, training_data, validation_data, **kwargs):
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
# Uncomment this to ease debugging:
# run_eagerly=True,
)
kwargs.setdefault("epochs", 5)
kwargs.setdefault("verbose", 1)
log = model.fit(
training_data.batch(128).prefetch(8),
validation_data=validation_data.batch(128).cache(),
validation_freq=1,
**kwargs,
)
return log.history["val_sparse_categorical_accuracy"][-1]
classifier_accuracy = train_model(
classifier, training_dataset, validation_dataset)
print(f"Accuracy: {classifier_accuracy:0.4f}")
成功!该模型训练良好,在 5 个周期内的验证集上的准确率达到了 98% 以上。
训练可压缩分类器#
熵惩罚重参数化(EPR)有两个主要组成部分:
在训练期间对模型权重施加惩罚,该惩罚对应于概率模型下的熵,并与权重的编码方案相匹配。下面,我们定义一个实现此惩罚的 Keras
Regularizer
。重新参数化权重,即将它们带入更具可压缩性的潜在表示中(在可压缩性和模型性能之间达成更好的权衡)。对于卷积核,已经证明傅里叶域是一个很好的表示。对于其他参数,以下示例仅使用具有不同量化步长的标量量化(舍入)。
首先,定义惩罚。
下面的示例使用在 tfc.PowerLawEntropyModel
类中实现的代码/概率模型,灵感来自论文 Optimizing the Communication-Accuracy Trade-off in Federated Learning with Rate-Distortion Theory。惩罚定义为:$\( \log \Bigl(\frac {|x| + \alpha} \alpha\Bigr),\)\( 其中 \)x\( 是模型参数或其潜在表示的一个元素,\)\alpha$ 是一个数值稳定性在 0 附近小常量。
_ = tf.linspace(-5., 5., 501)
plt.plot(_, tfc.PowerLawEntropyModel(0).penalty(_));
这种惩罚实际上是一种正则化损失(有时称为“权重损失”)。它是凹形的,顶点为零,这一事实鼓励权重稀疏。用于压缩权重的编码方案是 Elias gamma 码,它为元素大小产生长度为 \( 1 + \lfloor \log_2 |x| \rfloor \) 比特的编码。也就是说,它与惩罚相匹配,并应用惩罚从而最小化预期的代码长度。
class PowerLawRegularizer(tf.keras.regularizers.Regularizer):
def __init__(self, lmbda):
super().__init__()
self.lmbda = lmbda
def __call__(self, variable):
em = tfc.PowerLawEntropyModel(coding_rank=variable.shape.rank)
return self.lmbda * em.penalty(variable)
# Normalizing the weight of the penalty by the number of model parameters is a
# good rule of thumb to produce comparable results across models.
regularizer = PowerLawRegularizer(lmbda=2./classifier.count_params())
其次,定义 CustomDense
和 CustomConv2D
的子类,它们具有以下附加功能:
它们接受上述 Regularizer 的一个实例,并将其应用于训练期间的内核和偏差。
它们将内核和偏差定义为
@property
,每当访问变量时,它们都会使用直通梯度执行量化。这准确地反映了稍后在压缩模型中执行的计算。它们定义了额外的
log_step
变量,代表量化步长的对数。量化越粗,模型越小,但准确率越低。每个模型参数的量化步长都可训练,因此对惩罚损失函数执行优化将确定最佳量化步长。
量化步长定义如下:
def quantize(latent, log_step):
step = tf.exp(log_step)
return tfc.round_st(latent / step) * step
有了它,我们可以定义密集层:
class CompressibleDense(CustomDense):
def __init__(self, regularizer, *args, **kwargs):
super().__init__(*args, **kwargs)
self.regularizer = regularizer
def build(self, input_shape, other=None):
"""Instantiates weights, optionally initializing them from `other`."""
super().build(input_shape, other=other)
if other is not None and hasattr(other, "kernel_log_step"):
kernel_log_step = other.kernel_log_step
bias_log_step = other.bias_log_step
else:
kernel_log_step = bias_log_step = -4.
self.kernel_log_step = tf.Variable(
tf.cast(kernel_log_step, self.variable_dtype), name="kernel_log_step")
self.bias_log_step = tf.Variable(
tf.cast(bias_log_step, self.variable_dtype), name="bias_log_step")
self.add_loss(lambda: self.regularizer(
self.kernel_latent / tf.exp(self.kernel_log_step)))
self.add_loss(lambda: self.regularizer(
self.bias_latent / tf.exp(self.bias_log_step)))
@property
def kernel(self):
return quantize(self.kernel_latent, self.kernel_log_step)
@kernel.setter
def kernel(self, kernel):
self.kernel_latent = tf.Variable(kernel, name="kernel_latent")
@property
def bias(self):
return quantize(self.bias_latent, self.bias_log_step)
@bias.setter
def bias(self, bias):
self.bias_latent = tf.Variable(bias, name="bias_latent")
卷积层类似。此外,只要设置了卷积核,就会将卷积核作为其实值离散傅里叶变换 (RDFT) 存储,并且每当使用该核时,变换都会被反转。由于内核的不同频率分量往往或多或少是可压缩的,因此其中的每个分量都被分配了自己的量化步长。
按如下方式定义傅里叶变换及其逆变换:
def to_rdft(kernel, kernel_size):
# The kernel has shape (H, W, I, O) -> transpose to take DFT over last two
# dimensions.
kernel = tf.transpose(kernel, (2, 3, 0, 1))
# The RDFT has type complex64 and shape (I, O, FH, FW).
kernel_rdft = tf.signal.rfft2d(kernel)
# Map real and imaginary parts into regular floats. The result is float32
# and has shape (I, O, FH, FW, 2).
kernel_rdft = tf.stack(
[tf.math.real(kernel_rdft), tf.math.imag(kernel_rdft)], axis=-1)
# Divide by kernel size to make the DFT orthonormal (length-preserving).
return kernel_rdft / kernel_size
def from_rdft(kernel_rdft, kernel_size):
# Undoes the transformations in to_rdft.
kernel_rdft *= kernel_size
kernel_rdft = tf.dtypes.complex(*tf.unstack(kernel_rdft, axis=-1))
kernel = tf.signal.irfft2d(kernel_rdft, fft_length=2 * (kernel_size,))
return tf.transpose(kernel, (2, 3, 0, 1))
这样,将卷积层定义为:
class CompressibleConv2D(CustomConv2D):
def __init__(self, regularizer, *args, **kwargs):
super().__init__(*args, **kwargs)
self.regularizer = regularizer
def build(self, input_shape, other=None):
"""Instantiates weights, optionally initializing them from `other`."""
super().build(input_shape, other=other)
if other is not None and hasattr(other, "kernel_log_step"):
kernel_log_step = other.kernel_log_step
bias_log_step = other.bias_log_step
else:
kernel_log_step = tf.fill(self.kernel_latent.shape[2:], -4.)
bias_log_step = -4.
self.kernel_log_step = tf.Variable(
tf.cast(kernel_log_step, self.variable_dtype), name="kernel_log_step")
self.bias_log_step = tf.Variable(
tf.cast(bias_log_step, self.variable_dtype), name="bias_log_step")
self.add_loss(lambda: self.regularizer(
self.kernel_latent / tf.exp(self.kernel_log_step)))
self.add_loss(lambda: self.regularizer(
self.bias_latent / tf.exp(self.bias_log_step)))
@property
def kernel(self):
kernel_rdft = quantize(self.kernel_latent, self.kernel_log_step)
return from_rdft(kernel_rdft, self.kernel_size)
@kernel.setter
def kernel(self, kernel):
kernel_rdft = to_rdft(kernel, self.kernel_size)
self.kernel_latent = tf.Variable(kernel_rdft, name="kernel_latent")
@property
def bias(self):
return quantize(self.bias_latent, self.bias_log_step)
@bias.setter
def bias(self, bias):
self.bias_latent = tf.Variable(bias, name="bias_latent")
使用与上面相同的架构定义分类器模型,但使用以下修改后的层:
def make_mnist_classifier(regularizer):
return tf.keras.Sequential([
CompressibleConv2D(regularizer, 20, 5, strides=2, name="conv_1"),
CompressibleConv2D(regularizer, 50, 5, strides=2, name="conv_2"),
tf.keras.layers.Flatten(),
CompressibleDense(regularizer, 500, name="fc_1"),
CompressibleDense(regularizer, 10, name="fc_2"),
], name="classifier")
compressible_classifier = make_mnist_classifier(regularizer)
并训练模型:
penalized_accuracy = train_model(
compressible_classifier, training_dataset, validation_dataset)
print(f"Accuracy: {penalized_accuracy:0.4f}")
可压缩模型已达到与普通分类器相似的准确率。
但是,该模型实际上还没有被压缩。为此,我们定义了另一组子类,它们以压缩形式存储内核和偏差(作为位序列)。
压缩分类器#
下面定义的 CustomDense
和 CustomConv2D
的子类将可压缩密集层的权重转换为二进制字符串。此外,它们以半精度存储量化步长的对数以节省空间。每当通过 @property
访问内核或偏差时,它们就会从其字符串表示中解压缩并去量化。
首先,定义函数来压缩和解压缩模型参数:
def compress_latent(latent, log_step, name):
em = tfc.PowerLawEntropyModel(latent.shape.rank)
compressed = em.compress(latent / tf.exp(log_step))
compressed = tf.Variable(compressed, name=f"{name}_compressed")
log_step = tf.cast(log_step, tf.float16)
log_step = tf.Variable(log_step, name=f"{name}_log_step")
return compressed, log_step
def decompress_latent(compressed, shape, log_step):
latent = tfc.PowerLawEntropyModel(len(shape)).decompress(compressed, shape)
step = tf.exp(tf.cast(log_step, latent.dtype))
return latent * step
有了这些,我们可以定义 CompressedDense
:
class CompressedDense(CustomDense):
def build(self, input_shape, other=None):
assert isinstance(other, CompressibleDense)
self.input_channels = other.kernel.shape[0]
self.kernel_compressed, self.kernel_log_step = compress_latent(
other.kernel_latent, other.kernel_log_step, "kernel")
self.bias_compressed, self.bias_log_step = compress_latent(
other.bias_latent, other.bias_log_step, "bias")
self.built = True
@property
def kernel(self):
kernel_shape = (self.input_channels, self.filters)
return decompress_latent(
self.kernel_compressed, kernel_shape, self.kernel_log_step)
@property
def bias(self):
bias_shape = (self.filters,)
return decompress_latent(
self.bias_compressed, bias_shape, self.bias_log_step)
卷积层类与上面类似。
class CompressedConv2D(CustomConv2D):
def build(self, input_shape, other=None):
assert isinstance(other, CompressibleConv2D)
self.input_channels = other.kernel.shape[2]
self.kernel_compressed, self.kernel_log_step = compress_latent(
other.kernel_latent, other.kernel_log_step, "kernel")
self.bias_compressed, self.bias_log_step = compress_latent(
other.bias_latent, other.bias_log_step, "bias")
self.built = True
@property
def kernel(self):
rdft_shape = (self.input_channels, self.filters,
self.kernel_size, self.kernel_size // 2 + 1, 2)
kernel_rdft = decompress_latent(
self.kernel_compressed, rdft_shape, self.kernel_log_step)
return from_rdft(kernel_rdft, self.kernel_size)
@property
def bias(self):
bias_shape = (self.filters,)
return decompress_latent(
self.bias_compressed, bias_shape, self.bias_log_step)
要将可压缩模型转换为压缩模型,我们可以方便地使用 clone_model
函数。compress_layer
可以将任何可压缩层转换为压缩层,并简单地传递给任何其他类型的层(例如 Flatten
等)。
def compress_layer(layer):
if isinstance(layer, CompressibleDense):
return CompressedDense.copy(layer)
if isinstance(layer, CompressibleConv2D):
return CompressedConv2D.copy(layer)
return type(layer).from_config(layer.get_config())
compressed_classifier = tf.keras.models.clone_model(
compressible_classifier, clone_function=compress_layer)
现在,我们来验证压缩模型是否仍按预期执行:
compressed_classifier.compile(metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
_, compressed_accuracy = compressed_classifier.evaluate(validation_dataset.batch(128))
print(f"Accuracy of the compressible classifier: {penalized_accuracy:0.4f}")
print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}")
压缩模型的分类准确率与训练期间达到的分类准确率相同!
此外,压缩后的模型权重的大小远小于原始模型的大小:
def get_weight_size_in_bytes(weight):
if weight.dtype == tf.string:
return tf.reduce_sum(tf.strings.length(weight, unit="BYTE"))
else:
return tf.size(weight) * weight.dtype.size
original_size = sum(map(get_weight_size_in_bytes, classifier.weights))
compressed_size = sum(map(get_weight_size_in_bytes, compressed_classifier.weights))
print(f"Size of original model weights: {original_size} bytes")
print(f"Size of compressed model weights: {compressed_size} bytes")
print(f"Compression ratio: {(original_size/compressed_size):0.0f}x")
将模型存储在磁盘上需要一些开销来存储模型架构、函数图等。
ZIP 等无损压缩方法擅长压缩此类数据,但不擅长压缩权重本身。这就是为什么在应用了 ZIP 压缩之后,当计算模型大小(包括开销)时,EPR 仍然具有显著优势:
import os
import shutil
def get_disk_size(model, path):
model.save(path)
zip_path = shutil.make_archive(path, "zip", path)
return os.path.getsize(zip_path)
original_zip_size = get_disk_size(classifier, "/tmp/classifier")
compressed_zip_size = get_disk_size(
compressed_classifier, "/tmp/compressed_classifier")
print(f"Original on-disk size (ZIP compressed): {original_zip_size} bytes")
print(f"Compressed on-disk size (ZIP compressed): {compressed_zip_size} bytes")
print(f"Compression ratio: {(original_zip_size/compressed_zip_size):0.0f}x")
正则化效果和大小-准确度权衡#
上面,\(\lambda\) 超参数被设置为 2(通过模型中的参数数量进行标准化)。随着我们增加 \(\lambda\),模型权重的可压缩性受到越来越严重的惩罚。
对于较低的值,惩罚可以起到权重调节器的作用。它实际上对分类器的泛化性能有有益的影响,并且可以在验证数据集上产生略高的准确率:
#@title
print(f"Accuracy of the vanilla classifier: {classifier_accuracy:0.4f}")
print(f"Accuracy of the penalized classifier: {penalized_accuracy:0.4f}")
对于更高的值,我们看到模型大小越来越小,但准确率也在逐渐降低。为了看到这一点,我们来训练几个模型,并绘制它们的大小与准确率之间的关系图:
def compress_and_evaluate_model(lmbda):
print(f"lambda={lmbda:0.0f}: training...", flush=True)
regularizer = PowerLawRegularizer(lmbda=lmbda/classifier.count_params())
compressible_classifier = make_mnist_classifier(regularizer)
train_model(
compressible_classifier, training_dataset, validation_dataset, verbose=0)
print("compressing...", flush=True)
compressed_classifier = tf.keras.models.clone_model(
compressible_classifier, clone_function=compress_layer)
compressed_size = sum(map(
get_weight_size_in_bytes, compressed_classifier.weights))
compressed_zip_size = float(get_disk_size(
compressed_classifier, "/tmp/compressed_classifier"))
print("evaluating...", flush=True)
compressed_classifier = tf.keras.models.load_model(
"/tmp/compressed_classifier")
compressed_classifier.compile(
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
_, compressed_accuracy = compressed_classifier.evaluate(
validation_dataset.batch(128), verbose=0)
print()
return compressed_size, compressed_zip_size, compressed_accuracy
lambdas = (2., 5., 10., 20., 50.)
metrics = [compress_and_evaluate_model(l) for l in lambdas]
metrics = tf.convert_to_tensor(metrics, tf.float32)
#@title
def plot_broken_xaxis(ax, compressed_sizes, original_size, original_accuracy):
xticks = list(range(
int(tf.math.floor(min(compressed_sizes) / 5) * 5),
int(tf.math.ceil(max(compressed_sizes) / 5) * 5) + 1,
5))
xticks.append(xticks[-1] + 10)
ax.set_xlim(xticks[0], xticks[-1] + 2)
ax.set_xticks(xticks[1:])
ax.set_xticklabels(xticks[1:-1] + [f"{original_size:0.2f}"])
ax.plot(xticks[-1], original_accuracy, "o", label="float32")
sizes, zip_sizes, accuracies = tf.transpose(metrics)
sizes /= 1024
zip_sizes /= 1024
fig, (axl, axr) = plt.subplots(1, 2, sharey=True, figsize=(10, 4))
axl.plot(sizes, accuracies, "o-", label="EPR compressed")
axr.plot(zip_sizes, accuracies, "o-", label="EPR compressed")
plot_broken_xaxis(axl, sizes, original_size/1024, classifier_accuracy)
plot_broken_xaxis(axr, zip_sizes, original_zip_size/1024, classifier_accuracy)
axl.set_xlabel("size of model weights [kbytes]")
axr.set_xlabel("ZIP compressed on-disk model size [kbytes]")
axl.set_ylabel("accuracy")
axl.legend(loc="lower right")
axr.legend(loc="lower right")
axl.grid()
axr.grid()
for i in range(len(lambdas)):
axl.annotate(f"$\lambda = {lambdas[i]:0.0f}$", (sizes[i], accuracies[i]),
xytext=(10, -5), xycoords="data", textcoords="offset points")
axr.annotate(f"$\lambda = {lambdas[i]:0.0f}$", (zip_sizes[i], accuracies[i]),
xytext=(10, -5), xycoords="data", textcoords="offset points")
plt.tight_layout()
理想情况下,该图应显示肘形大小-准确率权衡,但准确率指标有些噪声也正常。根据初始化的不同,曲线可能会出现一些曲折。
由于正则化效应,对于较小的 \(\lambda\) 值,EPR 压缩模型在测试集上比原始模型更准确。即使我们比较附加 ZIP 压缩后的大小,EPR 压缩模型也要小很多倍。
解压缩分类器#
CompressedDense
和 CompressedConv2D
在每次前向传递时会解压缩它们的权重。这使得它们非常适合内存有限的设备,但解压缩的计算成本可能很高,尤其是对于小批次。
要将模型解压缩一次,并将其用于进一步的训练或推断,我们可以使用常规层或可压缩层将其转换回模型。这在模型部署或联合学习场景中很有用。
首先,转换回普通模型,我们可以进行推断,和/或继续进行常规训练,而不会有压缩惩罚:
def decompress_layer(layer):
if isinstance(layer, CompressedDense):
return CustomDense.copy(layer)
if isinstance(layer, CompressedConv2D):
return CustomConv2D.copy(layer)
return type(layer).from_config(layer.get_config())
decompressed_classifier = tf.keras.models.clone_model(
compressed_classifier, clone_function=decompress_layer)
decompressed_accuracy = train_model(
decompressed_classifier, training_dataset, validation_dataset, epochs=1)
print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}")
print(f"Accuracy of the decompressed classifier after one more epoch of training: {decompressed_accuracy:0.4f}")
请注意,在训练额外的周期后验证准确率会下降,因为训练是在没有正则化的情况下完成的。
或者,我们可以将模型转换回“可压缩”模型,以进行推断和/或进一步训练,并带有压缩惩罚:
def decompress_layer_with_penalty(layer):
if isinstance(layer, CompressedDense):
return CompressibleDense.copy(layer, regularizer=regularizer)
if isinstance(layer, CompressedConv2D):
return CompressibleConv2D.copy(layer, regularizer=regularizer)
return type(layer).from_config(layer.get_config())
decompressed_classifier = tf.keras.models.clone_model(
compressed_classifier, clone_function=decompress_layer_with_penalty)
decompressed_accuracy = train_model(
decompressed_classifier, training_dataset, validation_dataset, epochs=1)
print(f"Accuracy of the compressed classifier: {compressed_accuracy:0.4f}")
print(f"Accuracy of the decompressed classifier after one more epoch of training: {decompressed_accuracy:0.4f}")
在这里,在训练一个额外的周期后,准确率会提高。