Estimator#

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

警告:不建议将 Estimator 用于新代码。Estimator 运行 v1.Session 风格的代码,此类代码更加难以正确编写,并且可能会出现意外行为,尤其是与 TF 2 代码结合使用时。Estimator 确实在我们的兼容性保证范围内,但除了安全漏洞之外不会得到任何修复。请参阅迁移指南以了解详情。

本文档介绍了 tf.estimator,它是一种高级 TensorFlow API。Estimator 封装了以下操作:

  • 训练

  • 评估

  • 预测

  • 导出以供使用

您可以使用我们提供的预制 Estimator 或编写您自己的自定义 Estimator。所有 Estimator(无论是预制还是自定义)都是基于 tf.estimator.Estimator 类的类。

有关简单示例,请查看 Estimator 教程。有关 API 设计概述,请参阅白皮书

设置#

from set_env import temp_dir
!pip install -U tensorflow_datasets
import tempfile
import os

import tensorflow as tf
import tensorflow_datasets as tfds

优势#

tf.keras.Model 类似,estimator 是模型级别的抽象。tf.estimator 提供了一些目前仍在为 tf.keras 开发中的功能。包括:

  • 基于参数服务器的训练

  • 完整的 TFX 集成

Estimator 功能#

Estimator 提供了以下优势:

  • 您可以在本地主机上或分布式多服务器环境中运行基于 Estimator 的模型,而无需更改模型。此外,您还可以在 CPU、GPU 或 TPU 上运行基于 Estimator 的模型,而无需重新编码模型。

  • Estimator 提供了安全的分布式训练循环,可控制如何以及何时进行以下操作:

    • 加载数据

    • 处理异常

    • 创建检查点文件并从故障中恢复

    • 保存 TensorBoard 摘要

在用 Estimator 编写应用时,您必须将数据输入流水线与模型分离。这种分离简化了使用不同数据集进行的实验。

预制 Estimator 程序结构#

使用预制 Estimator,您能够在比基础 TensorFlow API 高很多的概念层面上工作。您无需再担心创建计算图或会话,因为 Estimator 会替您完成所有“基础工作”。此外,使用预制 Estimator,您只需改动较少代码就能试验不同的模型架构。例如,tf.estimator.DNNClassifier 是一个预制 Estimator 类,可基于密集的前馈神经网络对分类模型进行训练。

依赖于预制 Estimator 的 TensorFlow 程序通常包括以下四个步骤:

1. 编写一个或多个数据集导入函数。#

例如,您可以创建一个函数来导入训练集,创建另一个函数来导入测试集。每个数据集导入函数必须返回以下两个对象:

  • 字典,其中键是特征名称,值是包含相应特征数据的张量(或 SparseTensor)

  • 包含一个或多个标签的张量

input_fn 应当返回一个 tf.data.Dataset 以产生该格式的对。

例如,以下代码展示了输入函数的基本框架:

def train_input_fn():
  titanic_file = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
  titanic = tf.data.experimental.make_csv_dataset(
      titanic_file, batch_size=32,
      label_name="survived")
  titanic_batches = (
      titanic.cache().repeat().shuffle(500)
      .prefetch(tf.data.AUTOTUNE))
  return titanic_batches

input_fntf.Graph 中执行,也可以直接返回包含计算图张量的 (features_dics, labels) 对,但这在返回常量等简单情况之外很容易出错。

2. 定义特征列。#

每个 tf.feature_column 标识了特征名称、特征类型,以及任何输入预处理。例如,以下代码段创建了三个包含整数或浮点数据的特征列。前两个特征列仅标识了特征的名称和类型。第三个特征列还指定了一个会被程序调用以缩放原始数据的 lambda:

例如,以下代码段会创建三个特征列。

  • 第一个直接使用 age 特征作为浮点输入。

  • 第二个使用 class 特征作为分类输入。

  • 第三个使用 embark_town 作为分类输入,但使用 hashing trick 来避免枚举选项并设置选项数量的需要。

有关详细信息,请参阅特征列教程

# Define three numeric feature columns. population = tf.feature_column.numeric_column('population') crime_rate = tf.feature_column.numeric_column('crime_rate') median_education = tf.feature_column.numeric_column(   'median_education',   normalizer_fn=lambda x: x - global_education_mean)

3. 实例化相关预制 Estimator。#

例如,下面是对名为 LinearClassifier 的预制 Estimator 进行实例化的示例:

# Instantiate an estimator, passing the feature columns. estimator = tf.estimator.LinearClassifier(   feature_columns=[population, crime_rate, median_education])

有关详细信息,请参阅线性分类器教程

4. 调用训练、评估或推断方法。#

所有 Estimator 都提供 trainevaluatepredict 方法。

# `input_fn` is the function created in Step 1 estimator.train(input_fn=my_training_set, steps=2000)
result = model.evaluate(train_input_fn, steps=10)

for key, value in result.items():
  print(key, ":", value)
您可以在下面看到与此相关的示例。

预制 Estimator 的优势#

预制 Estimator 对最佳做法进行了编码,具有以下优势:

  • 确定计算图不同部分的运行位置,以及在单台机器或集群上实施策略的最佳做法。

  • 事件(摘要)编写和通用摘要的最佳做法。

如果不使用预制 Estimator,则您必须自己实现上述功能。

自定义 Estimator#

每个 Estimator(无论预制还是自定义)的核心是其模型函数,这是一种为训练、评估和预测构建计算图的方法。当您使用预制 Estimator 时,已经有人为您实现了模型函数。当使用自定义 Estimator 时,您必须自己编写模型函数。

注:自定义 model_fn 仍将在 1.x 样式的计算图模式下运行。这意味着没有 Eager Execution,也没有自动控制依赖项。您应当计划使用自定义 model_fntf.estimator 迁移。替代 API 是 tf.kerastf.distribute。如果您的训练的某个部分仍需要 Estimator,则可以使用 tf.keras.estimator.model_to_estimator 转换器从 keras.Model 创建 Estimator

从 Keras 模型创建 Estimator#

您可以使用 tf.keras.estimator.model_to_estimator 将现有的 Keras 模型转换为 Estimator。这样一来,您的 Keras 模型就可以利用 Estimator 的优势,例如分布式训练。

实例化 Keras MobileNet V2 模型并用训练中使用的优化器、损失和指标来编译模型:

keras_mobilenet_v2 = tf.keras.applications.MobileNetV2(
    input_shape=(160, 160, 3), include_top=False)
keras_mobilenet_v2.trainable = False

estimator_model = tf.keras.Sequential([
    keras_mobilenet_v2,
    tf.keras.layers.GlobalAveragePooling2D(),
    tf.keras.layers.Dense(1)
])

# Compile the model
estimator_model.compile(
    optimizer='adam',
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
    metrics=['accuracy'])

从已编译的 Keras 模型创建 Estimator。Keras 模型的初始模型状态会保留在已创建的 Estimator中:

est_mobilenet_v2 = tf.keras.estimator.model_to_estimator(keras_model=estimator_model)

您可以像对待任何其他 Estimator 一样对待派生的 Estimator

IMG_SIZE = 160  # All images will be resized to 160x160

def preprocess(image, label):
  image = tf.cast(image, tf.float32)
  image = (image/127.5) - 1
  image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
  return image, label
def train_input_fn(batch_size):
  data = tfds.load('cats_vs_dogs', as_supervised=True)
  train_data = data['train']
  train_data = train_data.map(preprocess).shuffle(500).batch(batch_size)
  return train_data

要进行训练,可调用 Estimator 的训练函数:

est_mobilenet_v2.train(input_fn=lambda: train_input_fn(32), steps=50)

同样,要进行评估,可调用 Estimator 的评估函数:

est_mobilenet_v2.evaluate(input_fn=lambda: train_input_fn(32), steps=10)

有关详细信息,请参阅 tf.keras.estimator.model_to_estimator 文档。

从 Keras 模型创建 Estimator#

默认情况下,Estimator 使用变量名而不是检查点指南中介绍的对象计算图来保存检查点。tf.train.Checkpoint 将读取基于名称的检查点,但是在将模型的一部分移到 Estimator 的 model_fn 外部时,变量名称可能会更改。对于前向兼容性,保存基于对象的检查点可以更轻松地在 Estimator 内训练模型,然后在外部使用。

import tensorflow as tf
import tensorflow_datasets as tfds
tfds.disable_progress_bar()
class Net(tf.keras.Model):
  """A simple linear model."""

  def __init__(self):
    super(Net, self).__init__()
    self.l1 = tf.keras.layers.Dense(5)

  def call(self, x):
    return self.l1(x)
def model_fn(features, labels, mode):
  net = Net()
  opt = tf.keras.optimizers.Adam(0.1)
  ckpt = tf.train.Checkpoint(step=tf_compat.train.get_global_step(),
                             optimizer=opt, net=net)
  with tf.GradientTape() as tape:
    output = net(features['x'])
    loss = tf.reduce_mean(tf.abs(output - features['y']))
  variables = net.trainable_variables
  gradients = tape.gradient(loss, variables)
  return tf.estimator.EstimatorSpec(
    mode,
    loss=loss,
    train_op=tf.group(opt.apply_gradients(zip(gradients, variables)),
                      ckpt.step.assign_add(1)),
    # Tell the Estimator to save "ckpt" in an object-based format.
    scaffold=tf_compat.train.Scaffold(saver=ckpt))

tf.keras.backend.clear_session()
est = tf.estimator.Estimator(model_fn, './tf_estimator_example/')
est.train(toy_dataset, steps=10)

随后,tf.train.Checkpoint 可以从其 model_dir 加载 Estimator 的检查点。

opt = tf.keras.optimizers.Adam(0.1)
net = Net()
ckpt = tf.train.Checkpoint(
  step=tf.Variable(1, dtype=tf.int64), optimizer=opt, net=net)
ckpt.restore(tf.train.latest_checkpoint('./tf_estimator_example/'))
ckpt.step.numpy()  # From est.train(..., steps=10)

Estimator 中的 SavedModel#

Estimator 通过 tf.Estimator.export_saved_model 导出 SavedModel。

input_column = tf.feature_column.numeric_column("x")

estimator = tf.estimator.LinearClassifier(feature_columns=[input_column])

def input_fn():
  return tf.data.Dataset.from_tensor_slices(
    ({"x": [1., 2., 3., 4.]}, [1, 1, 0, 0])).repeat(200).shuffle(64).batch(16)
estimator.train(input_fn)

要保存 Estimator,您需要创建 serving_input_receiver。此函数构建 tf.Graph 的一部分,用于解析 SavedModel 接收到的原始数据。

tf.estimator.export 模块包含帮助构建这些 receivers 的函数。

下面的代码基于 feature_columns 构建一个接收器,它接受通常与 tf-serving 一起使用的序列化 tf.Example 协议缓冲区。

tmpdir = tempfile.mkdtemp()

serving_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
  tf.feature_column.make_parse_example_spec([input_column]))

estimator_base_path = os.path.join(tmpdir, 'from_estimator')
estimator_path = estimator.export_saved_model(estimator_base_path, serving_input_fn)

您还可以从 Python 加载和运行该模型:

imported = tf.saved_model.load(estimator_path)

def predict(x):
  example = tf.train.Example()
  example.features.feature["x"].float_list.value.extend([x])
  return imported.signatures["predict"](
    examples=tf.constant([example.SerializeToString()]))
print(predict(1.5))
print(predict(3.5))

通过 tf.estimator.export.build_raw_serving_input_receiver_fn 可以创建输入函数,这些函数使用原始张量,而不是 tf.train.Example

在 Estimator 中使用 tf.distribute.Strategy(有限支持)#

tf.estimator 是分布式训练 TensorFlow API,最初支持异步参数服务器方法。tf.estimator 现在支持 tf.distribute.Strategy。如果您正在使用 tf.estimator,那么您只需改动少量代码即可轻松转换为分布式训练。借助此功能,Estimator 用户现在可以在多个 GPU 和多个工作进程以及 TPU 上进行同步分布式训练。但是,Estimator 的这种支持是有限的。有关详细信息,请参阅下文目前支持的策略部分。

在 Estimator 中使用 tf.distribute.Strategy 的方法与在 Keras 中略有不同。现在我们不使用 strategy.scope,而是将策略对象传递到 Estimator 的 RunConfig 中。

要了解更多信息,请参阅分布式训练指南

以下代码段使用预制 Estimator LinearRegressorMirroredStrategy 展示了这种情况:

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)

我们在这里使用了预制 Estimator,但同样的代码也适用于自定义 Estimator。train_distribute 决定训练如何分布,eval_distribute 决定评估如何分布。这是与 Keras 的另一个区别,在 Keras 中,我们会对训练和评估使用相同的策略。

现在,我们可以使用输入函数来训练和评估这个 Estimator:

def input_fn(dataset):     ...  # manipulate dataset, extracting the feature dict and the label     return feature_dict, label

需要在这里强调的 Estimator 和 Keras 的另一个区别是输入处理。在 Keras 中,数据集的每个批次都会在多个副本之间自动拆分。但在 Estimator 中,批次不会自动拆分,也不会在不同的工作进程之间自动对数据进行分片处理。您可以完全控制数据在工作进程和设备之间的分布方式,而且您必须提供 input_fn 来指定数据的分布方式。

每个工作进程都会调用一次 input_fn,从而为每个工作进程提供一个数据集。然后数据集中的一个批次会被馈送到此工作进程上的一个副本,因此,1 个工作进程上的 N 个副本要使用 N 个批次。换句话说,input_fn 返回的数据集应提供大小为 PER_REPLICA_BATCH_SIZE 的批次。步骤的全局批次大小可通过 PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync 获得。

在进行多工作进程训练时,您应该将数据拆分至各个工作进程,或者在每个工作进程上重排随机种子。您可以在使用 Estimator 进行多工作进程训练教程中查看有关此操作的示例。

同样,您也可以使用多工作进程和参数服务器策略。代码保持不变,但需要使用 tf.estimator.train_and_evaluate,并为集群中运行的每个二进制文件设置 TF_CONFIG 环境变量。

目前支持的策略#

TPUStrategy 外,所有策略都对使用 Estimator 的训练提供有限支持。基本训练和评估应该可以正常运行,但如 v1.train.Scaffold 之类的许多高级功能尚不可用。此集成中可能还存在许多错误。目前,我们不打算主动改进此支持,而是专注于对 Keras 和自定义训练循环的支持。如果可能,您应该会更喜欢在这些 API 中使用 tf.distribute

训练 API

MirroredStrategy

TPUStrategy

MultiWorkerMirroredStrategy

CentralStorageStrategy

ParameterServerStrategy

Estimator API

有限支持

不支持

有限支持

有限支持

有限支持

示例和教程#

如果可能,您可以通过构建自己的自定义 Estimator 进一步改进模型。

  1. 使用 Estimator 进行多工作进程训练教程展示了如何在 MNIST 数据集上使用 MultiWorkerMirroredStrategy 在多个工作进程上一起训练。

  2. 使用 Kubernetes 模板在 tensorflow/ecosystem使用分布策略运行多工作进程训练的端到端示例。它从 Keras 模型开始,然后使用 tf.keras.estimator.model_to_estimator API 将其转换为 Estimator。

  3. 如果有其他合适的预制 Estimator,可通过运行实验确定哪个预制 Estimator 能够生成最佳结果。