Estimator#
在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
本文档介绍了 tf.estimator
,它是一种高级 TensorFlow API。Estimator 封装了以下操作:
训练
评估
预测
导出以供使用
您可以使用我们提供的预制 Estimator 或编写您自己的自定义 Estimator。所有 Estimator(无论是预制还是自定义)都是基于 tf.estimator.Estimator
类的类。
有关简单示例,请查看 Estimator 教程。有关 API 设计概述,请参阅白皮书。
设置#
from set_env import temp_dir
!pip install -U tensorflow_datasets
import tempfile
import os
import tensorflow as tf
import tensorflow_datasets as tfds
优势#
与 tf.keras.Model
类似,estimator
是模型级别的抽象。tf.estimator
提供了一些目前仍在为 tf.keras
开发中的功能。包括:
基于参数服务器的训练
完整的 TFX 集成
Estimator 功能#
Estimator 提供了以下优势:
您可以在本地主机上或分布式多服务器环境中运行基于 Estimator 的模型,而无需更改模型。此外,您还可以在 CPU、GPU 或 TPU 上运行基于 Estimator 的模型,而无需重新编码模型。
Estimator 提供了安全的分布式训练循环,可控制如何以及何时进行以下操作:
加载数据
处理异常
创建检查点文件并从故障中恢复
保存 TensorBoard 摘要
在用 Estimator 编写应用时,您必须将数据输入流水线与模型分离。这种分离简化了使用不同数据集进行的实验。
预制 Estimator 程序结构#
使用预制 Estimator,您能够在比基础 TensorFlow API 高很多的概念层面上工作。您无需再担心创建计算图或会话,因为 Estimator 会替您完成所有“基础工作”。此外,使用预制 Estimator,您只需改动较少代码就能试验不同的模型架构。例如,tf.estimator.DNNClassifier
是一个预制 Estimator 类,可基于密集的前馈神经网络对分类模型进行训练。
依赖于预制 Estimator 的 TensorFlow 程序通常包括以下四个步骤:
1. 编写一个或多个数据集导入函数。#
例如,您可以创建一个函数来导入训练集,创建另一个函数来导入测试集。每个数据集导入函数必须返回以下两个对象:
字典,其中键是特征名称,值是包含相应特征数据的张量(或 SparseTensor)
包含一个或多个标签的张量
input_fn
应当返回一个 tf.data.Dataset
以产生该格式的对。
例如,以下代码展示了输入函数的基本框架:
def train_input_fn():
titanic_file = tf.keras.utils.get_file("train.csv", "https://storage.googleapis.com/tf-datasets/titanic/train.csv")
titanic = tf.data.experimental.make_csv_dataset(
titanic_file, batch_size=32,
label_name="survived")
titanic_batches = (
titanic.cache().repeat().shuffle(500)
.prefetch(tf.data.AUTOTUNE))
return titanic_batches
input_fn
在 tf.Graph
中执行,也可以直接返回包含计算图张量的 (features_dics, labels)
对,但这在返回常量等简单情况之外很容易出错。
2. 定义特征列。#
每个 tf.feature_column
标识了特征名称、特征类型,以及任何输入预处理。例如,以下代码段创建了三个包含整数或浮点数据的特征列。前两个特征列仅标识了特征的名称和类型。第三个特征列还指定了一个会被程序调用以缩放原始数据的 lambda:
例如,以下代码段会创建三个特征列。
第一个直接使用
age
特征作为浮点输入。第二个使用
class
特征作为分类输入。第三个使用
embark_town
作为分类输入,但使用hashing trick
来避免枚举选项并设置选项数量的需要。
有关详细信息,请参阅特征列教程。
# Define three numeric feature columns. population = tf.feature_column.numeric_column('population') crime_rate = tf.feature_column.numeric_column('crime_rate') median_education = tf.feature_column.numeric_column( 'median_education', normalizer_fn=lambda x: x - global_education_mean)
3. 实例化相关预制 Estimator。#
例如,下面是对名为 LinearClassifier
的预制 Estimator 进行实例化的示例:
# Instantiate an estimator, passing the feature columns. estimator = tf.estimator.LinearClassifier( feature_columns=[population, crime_rate, median_education])
有关详细信息,请参阅线性分类器教程。
4. 调用训练、评估或推断方法。#
所有 Estimator 都提供 train
、 evaluate
和 predict
方法。
# `input_fn` is the function created in Step 1 estimator.train(input_fn=my_training_set, steps=2000)
result = model.evaluate(train_input_fn, steps=10)
for key, value in result.items():
print(key, ":", value)
您可以在下面看到与此相关的示例。
预制 Estimator 的优势#
预制 Estimator 对最佳做法进行了编码,具有以下优势:
确定计算图不同部分的运行位置,以及在单台机器或集群上实施策略的最佳做法。
事件(摘要)编写和通用摘要的最佳做法。
如果不使用预制 Estimator,则您必须自己实现上述功能。
自定义 Estimator#
每个 Estimator(无论预制还是自定义)的核心是其模型函数,这是一种为训练、评估和预测构建计算图的方法。当您使用预制 Estimator 时,已经有人为您实现了模型函数。当使用自定义 Estimator 时,您必须自己编写模型函数。
注:自定义
model_fn
仍将在 1.x 样式的计算图模式下运行。这意味着没有 Eager Execution,也没有自动控制依赖项。您应当计划使用自定义model_fn
从tf.estimator
迁移。替代 API 是tf.keras
和tf.distribute
。如果您的训练的某个部分仍需要Estimator
,则可以使用tf.keras.estimator.model_to_estimator
转换器从keras.Model
创建Estimator
。
从 Keras 模型创建 Estimator#
您可以使用 tf.keras.estimator.model_to_estimator
将现有的 Keras 模型转换为 Estimator。这样一来,您的 Keras 模型就可以利用 Estimator 的优势,例如分布式训练。
实例化 Keras MobileNet V2 模型并用训练中使用的优化器、损失和指标来编译模型:
keras_mobilenet_v2 = tf.keras.applications.MobileNetV2(
input_shape=(160, 160, 3), include_top=False)
keras_mobilenet_v2.trainable = False
estimator_model = tf.keras.Sequential([
keras_mobilenet_v2,
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(1)
])
# Compile the model
estimator_model.compile(
optimizer='adam',
loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
metrics=['accuracy'])
从已编译的 Keras 模型创建 Estimator
。Keras 模型的初始模型状态会保留在已创建的 Estimator
中:
est_mobilenet_v2 = tf.keras.estimator.model_to_estimator(keras_model=estimator_model)
您可以像对待任何其他 Estimator
一样对待派生的 Estimator
。
IMG_SIZE = 160 # All images will be resized to 160x160
def preprocess(image, label):
image = tf.cast(image, tf.float32)
image = (image/127.5) - 1
image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
return image, label
def train_input_fn(batch_size):
data = tfds.load('cats_vs_dogs', as_supervised=True)
train_data = data['train']
train_data = train_data.map(preprocess).shuffle(500).batch(batch_size)
return train_data
要进行训练,可调用 Estimator 的训练函数:
est_mobilenet_v2.train(input_fn=lambda: train_input_fn(32), steps=50)
同样,要进行评估,可调用 Estimator 的评估函数:
est_mobilenet_v2.evaluate(input_fn=lambda: train_input_fn(32), steps=10)
有关详细信息,请参阅 tf.keras.estimator.model_to_estimator
文档。
从 Keras 模型创建 Estimator#
默认情况下,Estimator 使用变量名而不是检查点指南中介绍的对象计算图来保存检查点。tf.train.Checkpoint
将读取基于名称的检查点,但是在将模型的一部分移到 Estimator 的 model_fn
外部时,变量名称可能会更改。对于前向兼容性,保存基于对象的检查点可以更轻松地在 Estimator 内训练模型,然后在外部使用。
import tensorflow as tf
import tensorflow_datasets as tfds
tfds.disable_progress_bar()
class Net(tf.keras.Model):
"""A simple linear model."""
def __init__(self):
super(Net, self).__init__()
self.l1 = tf.keras.layers.Dense(5)
def call(self, x):
return self.l1(x)
def model_fn(features, labels, mode):
net = Net()
opt = tf.keras.optimizers.Adam(0.1)
ckpt = tf.train.Checkpoint(step=tf_compat.train.get_global_step(),
optimizer=opt, net=net)
with tf.GradientTape() as tape:
output = net(features['x'])
loss = tf.reduce_mean(tf.abs(output - features['y']))
variables = net.trainable_variables
gradients = tape.gradient(loss, variables)
return tf.estimator.EstimatorSpec(
mode,
loss=loss,
train_op=tf.group(opt.apply_gradients(zip(gradients, variables)),
ckpt.step.assign_add(1)),
# Tell the Estimator to save "ckpt" in an object-based format.
scaffold=tf_compat.train.Scaffold(saver=ckpt))
tf.keras.backend.clear_session()
est = tf.estimator.Estimator(model_fn, './tf_estimator_example/')
est.train(toy_dataset, steps=10)
随后,tf.train.Checkpoint
可以从其 model_dir
加载 Estimator 的检查点。
opt = tf.keras.optimizers.Adam(0.1)
net = Net()
ckpt = tf.train.Checkpoint(
step=tf.Variable(1, dtype=tf.int64), optimizer=opt, net=net)
ckpt.restore(tf.train.latest_checkpoint('./tf_estimator_example/'))
ckpt.step.numpy() # From est.train(..., steps=10)
Estimator 中的 SavedModel#
Estimator 通过 tf.Estimator.export_saved_model
导出 SavedModel。
input_column = tf.feature_column.numeric_column("x")
estimator = tf.estimator.LinearClassifier(feature_columns=[input_column])
def input_fn():
return tf.data.Dataset.from_tensor_slices(
({"x": [1., 2., 3., 4.]}, [1, 1, 0, 0])).repeat(200).shuffle(64).batch(16)
estimator.train(input_fn)
要保存 Estimator
,您需要创建 serving_input_receiver
。此函数构建 tf.Graph
的一部分,用于解析 SavedModel 接收到的原始数据。
tf.estimator.export
模块包含帮助构建这些 receivers
的函数。
下面的代码基于 feature_columns
构建一个接收器,它接受通常与 tf-serving 一起使用的序列化 tf.Example
协议缓冲区。
tmpdir = tempfile.mkdtemp()
serving_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
tf.feature_column.make_parse_example_spec([input_column]))
estimator_base_path = os.path.join(tmpdir, 'from_estimator')
estimator_path = estimator.export_saved_model(estimator_base_path, serving_input_fn)
您还可以从 Python 加载和运行该模型:
imported = tf.saved_model.load(estimator_path)
def predict(x):
example = tf.train.Example()
example.features.feature["x"].float_list.value.extend([x])
return imported.signatures["predict"](
examples=tf.constant([example.SerializeToString()]))
print(predict(1.5))
print(predict(3.5))
通过 tf.estimator.export.build_raw_serving_input_receiver_fn
可以创建输入函数,这些函数使用原始张量,而不是 tf.train.Example
。
在 Estimator 中使用 tf.distribute.Strategy
(有限支持)#
tf.estimator
是分布式训练 TensorFlow API,最初支持异步参数服务器方法。tf.estimator
现在支持 tf.distribute.Strategy
。如果您正在使用 tf.estimator
,那么您只需改动少量代码即可轻松转换为分布式训练。借助此功能,Estimator 用户现在可以在多个 GPU 和多个工作进程以及 TPU 上进行同步分布式训练。但是,Estimator 的这种支持是有限的。有关详细信息,请参阅下文目前支持的策略部分。
在 Estimator 中使用 tf.distribute.Strategy
的方法与在 Keras 中略有不同。现在我们不使用 strategy.scope
,而是将策略对象传递到 Estimator 的 RunConfig 中。
要了解更多信息,请参阅分布式训练指南。
以下代码段使用预制 Estimator LinearRegressor
和 MirroredStrategy
展示了这种情况:
mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
feature_columns=[tf.feature_column.numeric_column('feats')],
optimizer='SGD',
config=config)
我们在这里使用了预制 Estimator,但同样的代码也适用于自定义 Estimator。train_distribute
决定训练如何分布,eval_distribute
决定评估如何分布。这是与 Keras 的另一个区别,在 Keras 中,我们会对训练和评估使用相同的策略。
现在,我们可以使用输入函数来训练和评估这个 Estimator:
def input_fn(dataset): ... # manipulate dataset, extracting the feature dict and the label return feature_dict, label
需要在这里强调的 Estimator 和 Keras 的另一个区别是输入处理。在 Keras 中,数据集的每个批次都会在多个副本之间自动拆分。但在 Estimator 中,批次不会自动拆分,也不会在不同的工作进程之间自动对数据进行分片处理。您可以完全控制数据在工作进程和设备之间的分布方式,而且您必须提供 input_fn
来指定数据的分布方式。
每个工作进程都会调用一次 input_fn
,从而为每个工作进程提供一个数据集。然后数据集中的一个批次会被馈送到此工作进程上的一个副本,因此,1 个工作进程上的 N 个副本要使用 N 个批次。换句话说,input_fn
返回的数据集应提供大小为 PER_REPLICA_BATCH_SIZE
的批次。步骤的全局批次大小可通过 PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
获得。
在进行多工作进程训练时,您应该将数据拆分至各个工作进程,或者在每个工作进程上重排随机种子。您可以在使用 Estimator 进行多工作进程训练教程中查看有关此操作的示例。
同样,您也可以使用多工作进程和参数服务器策略。代码保持不变,但需要使用 tf.estimator.train_and_evaluate
,并为集群中运行的每个二进制文件设置 TF_CONFIG
环境变量。
目前支持的策略#
除 TPUStrategy
外,所有策略都对使用 Estimator 的训练提供有限支持。基本训练和评估应该可以正常运行,但如 v1.train.Scaffold
之类的许多高级功能尚不可用。此集成中可能还存在许多错误。目前,我们不打算主动改进此支持,而是专注于对 Keras 和自定义训练循环的支持。如果可能,您应该会更喜欢在这些 API 中使用 tf.distribute
。
训练 API |
MirroredStrategy |
TPUStrategy |
MultiWorkerMirroredStrategy |
CentralStorageStrategy |
ParameterServerStrategy |
---|---|---|---|---|---|
Estimator API |
有限支持 |
不支持 |
有限支持 |
有限支持 |
有限支持 |
示例和教程#
如果可能,您可以通过构建自己的自定义 Estimator 进一步改进模型。
使用 Estimator 进行多工作进程训练教程展示了如何在 MNIST 数据集上使用
MultiWorkerMirroredStrategy
在多个工作进程上一起训练。使用 Kubernetes 模板在
tensorflow/ecosystem
中使用分布策略运行多工作进程训练的端到端示例。它从 Keras 模型开始,然后使用tf.keras.estimator.model_to_estimator
API 将其转换为 Estimator。如果有其他合适的预制 Estimator,可通过运行实验确定哪个预制 Estimator 能够生成最佳结果。