##### Copyright 2021 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
从 TPU embedding_column 迁移到 TPUEmbedding 层#
在 TensorFlow.org 上查看 | 在 Google Colab 运行 | 在 Github 上查看源代码 | 下载笔记本 |
本指南演示了如何将 TPU 上的嵌入向量训练从 TensorFlow 1 带有 TPUEstimator
的 embedding_column
API 迁移到 TensorFlow 2 带有 TPUStrategy
的 TPUEmbedding
层 API。
嵌入向量是(大型)矩阵。它们是从稀疏特征空间映射到密集向量的查找表。嵌入向量提供高效和密集表示,可以捕捉特征之间复杂的相似度和关系。
TensorFlow 包括对在 TPU 上训练嵌入向量的专门支持。这种特定于 TPU 的嵌入向量支持可让您训练大于单个 TPU 设备内存的嵌入向量,并在 TPU 上使用稀疏和不规则输入。
在 TensorFlow 1 中,
tf.compat.v1.estimator.tpu.TPUEstimator
是一个高级 API,它封装了适用于 TPU 的训练、评估、预测和导出。它对tf.compat.v1.tpu.experimental.embedding_column
有特殊支持。要在 TensorFlow 2 中实现此目标,可以使用 TensorFlow Recommenders 的
tfrs.layers.embedding.TPUEmbedding
层。对于训练和评估,可以使用 TPU 分布策略tf.distribute.TPUStrategy
,例如,它与用于模型构建 (tf.keras.Model
)、优化器 (tf.keras.optimizers.Optimizer
) 的 Keras API 兼容,支持使用Model.fit
或者自定义训练循环(使用tf.function
和tf.GradientTape
)进行训练。
有关详情,请参阅 tfrs.layers.embedding.TPUEmbedding
层的 API 文档,以及 tf.tpu.experimental.embedding.TableConfig
和 tf.tpu.experimental.embedding.FeatureConfig
文档。有关 tf.distribute.TPUStrategy
的概述,请查看分布式训练指南和使用 TPU 指南。如果您要从 TPUEstimator
迁移到 TPUStrategy
,请查看 TPU 迁移指南。
安装#
首先,安装 TensorFlow Recommenders 并导入一些必要的软件包:
!pip install tensorflow-recommenders
import tensorflow as tf
import tensorflow.compat.v1 as tf1
# TPUEmbedding layer is not part of TensorFlow.
import tensorflow_recommenders as tfrs
然后,准备一个用于演示目的的简单数据集:
features = [[1., 1.5]]
embedding_features_indices = [[0, 0], [0, 1]]
embedding_features_values = [0, 5]
labels = [[0.3]]
eval_features = [[4., 4.5]]
eval_embedding_features_indices = [[0, 0], [0, 1]]
eval_embedding_features_values = [4, 3]
eval_labels = [[0.8]]
TensorFlow 1:使用 TPUEstimator 在 TPU 上训练嵌入#
在 TensorFlow 1 中,使用 tf.compat.v1.tpu.experimental.embedding_column
API 设置 TPU 嵌入向量,并使用 tf.compat.v1.estimator.tpu.TPUEstimator
在 TPU 上训练/评估模型。
输入是整数,范围从零到 TPU 嵌入向量表的词汇量大小。首先使用 tf.feature_column.categorical_column_with_identity
将输入编码为分类 ID。将 "sparse_feature"
用于 key
参数,因为输入特征是整数值,而 num_buckets
是嵌入向量表的词汇量大小 (10
)。
embedding_id_column = (
tf1.feature_column.categorical_column_with_identity(
key="sparse_feature", num_buckets=10))
接下来,使用 tpu.experimental.embedding_column
将稀疏分类输入转换为密集表示,其中 dimension
是嵌入向量表的宽度。它将为每个 num_buckets
存储一个嵌入向量。
embedding_column = tf1.tpu.experimental.embedding_column(
embedding_id_column, dimension=5)
现在,通过 tf.estimator.tpu.experimental.EmbeddingConfigSpec
定义特定于 TPU 的嵌入向量配置。稍后,您会将其作为 embedding_config_spec
参数传递给 tf.estimator.tpu.TPUEstimator
。
embedding_config_spec = tf1.estimator.tpu.experimental.EmbeddingConfigSpec(
feature_columns=(embedding_column,),
optimization_parameters=(
tf1.tpu.experimental.AdagradParameters(0.05)))
接下来,要使用 TPUEstimator
,请定义:
用于训练数据的输入函数
用于评估数据的评估输入函数
用于指示
TPUEstimator
如何使用特征和标签定义训练运算的模型函数
def _input_fn(params):
dataset = tf1.data.Dataset.from_tensor_slices((
{"dense_feature": features,
"sparse_feature": tf1.SparseTensor(
embedding_features_indices,
embedding_features_values, [1, 2])},
labels))
dataset = dataset.repeat()
return dataset.batch(params['batch_size'], drop_remainder=True)
def _eval_input_fn(params):
dataset = tf1.data.Dataset.from_tensor_slices((
{"dense_feature": eval_features,
"sparse_feature": tf1.SparseTensor(
eval_embedding_features_indices,
eval_embedding_features_values, [1, 2])},
eval_labels))
dataset = dataset.repeat()
return dataset.batch(params['batch_size'], drop_remainder=True)
def _model_fn(features, labels, mode, params):
embedding_features = tf1.keras.layers.DenseFeatures(embedding_column)(features)
concatenated_features = tf1.keras.layers.Concatenate(axis=1)(
[embedding_features, features["dense_feature"]])
logits = tf1.layers.Dense(1)(concatenated_features)
loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
optimizer = tf1.train.AdagradOptimizer(0.05)
optimizer = tf1.tpu.CrossShardOptimizer(optimizer)
train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)
定义这些函数后,创建一个提供聚簇信息的 tf.distribute.cluster_resolver.TPUClusterResolver
和一个 tf.compat.v1.estimator.tpu.RunConfig
对象。
连同您已定义的模型函数,现在您可以创建一个 TPUEstimator
。在这里,您将通过跳过检查点保存来简化流程。随后,您将为 TPUEstimator
的训练和评估指定批次大小。
cluster_resolver = tf1.distribute.cluster_resolver.TPUClusterResolver(tpu='')
print("All devices: ", tf1.config.list_logical_devices('TPU'))
tpu_config = tf1.estimator.tpu.TPUConfig(
iterations_per_loop=10,
per_host_input_for_training=tf1.estimator.tpu.InputPipelineConfig
.PER_HOST_V2)
config = tf1.estimator.tpu.RunConfig(
cluster=cluster_resolver,
save_checkpoints_steps=None,
tpu_config=tpu_config)
estimator = tf1.estimator.tpu.TPUEstimator(
model_fn=_model_fn, config=config, train_batch_size=8, eval_batch_size=8,
embedding_config_spec=embedding_config_spec)
调用 TPUEstimator.train
以开始训练模型:
estimator.train(_input_fn, steps=1)
然后,调用 TPUEstimator.evaluate
以使用评估数据评估模型:
estimator.evaluate(_eval_input_fn, steps=1)
TensorFlow 2:使用 TPUStrategy 在 TPU 上训练嵌入向量#
在 TensorFlow 2 中,要在 TPU 工作进程上进行训练,请将 tf.distribute.TPUStrategy
与 Keras API 一起用于模型定义和训练/评估。(有关使用 Keras Model.fit 和自定义训练循环(使用 tf.function
和 tf.GradientTape
)进行训练的更多示例,请参阅使用 TPU 指南。)
您需要执行一些初始化工作以连接到远程聚簇并初始化 TPU 工作进程,因此首先创建一个 TPUClusterResolver
以提供聚簇信息并连接到聚簇。(在使用 TPU 指南的 TPU 初始化部分中了解详情。)
cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
接下来,准备您的数据。这类似于您在 TensorFlow 1 示例中创建数据集的方式,不同之处在于数据集函数现在传递一个 tf.distribute.InputContext
对象而不是 params
字典。可以使用此对象来确定本地批次大小(以及此流水线用于哪个主机,以便可以正确对数据分区)。
使用
tfrs.layers.embedding.TPUEmbedding
API 时,务必在使用Dataset.batch
对数据集进行批处理时包含drop_remainder=True
选项,因为TPUEmbedding
需要固定的批批次大小。此外,如果在同一组设备上进行评估和训练,则必须使用相同的批次大小。
最后,您应当将
tf.keras.utils.experimental.DatasetCreator
与tf.distribute.InputOptions
中的特殊输入选项experimental_fetch_to_device=False
(保存特定于策略的配置)一起使用。下面的代码对此进行了演示:
global_batch_size = 8
def _input_dataset(context: tf.distribute.InputContext):
dataset = tf.data.Dataset.from_tensor_slices((
{"dense_feature": features,
"sparse_feature": tf.SparseTensor(
embedding_features_indices,
embedding_features_values, [1, 2])},
labels))
dataset = dataset.shuffle(10).repeat()
dataset = dataset.batch(
context.get_per_replica_batch_size(global_batch_size),
drop_remainder=True)
return dataset.prefetch(2)
def _eval_dataset(context: tf.distribute.InputContext):
dataset = tf.data.Dataset.from_tensor_slices((
{"dense_feature": eval_features,
"sparse_feature": tf.SparseTensor(
eval_embedding_features_indices,
eval_embedding_features_values, [1, 2])},
eval_labels))
dataset = dataset.repeat()
dataset = dataset.batch(
context.get_per_replica_batch_size(global_batch_size),
drop_remainder=True)
return dataset.prefetch(2)
input_options = tf.distribute.InputOptions(
experimental_fetch_to_device=False)
input_dataset = tf.keras.utils.experimental.DatasetCreator(
_input_dataset, input_options=input_options)
eval_dataset = tf.keras.utils.experimental.DatasetCreator(
_eval_dataset, input_options=input_options)
接下来,准备好数据后,将创建一个 TPUStrategy
,然后在此策略的范围 (Strategy.scope
) 下定义模型、指标和优化器。
您应当为 Model.compile
中的 steps_per_execution
选择一个数字,因为它指定了每次 tf.function
调用期间要运行的批次数,并且对性能至关重要。此参数类似于 TPUEstimator
中使用的 iterations_per_loop
。
TensorFlow 1 中通过 tf.tpu.experimental.embedding_column
(和 tf.tpu.experimental.shared_embedding_column
)指定的特征和表配置可以通过一对配置对象在 TensorFlow 2 中直接指定:
tf.tpu.experimental.embedding.FeatureConfig
tf.tpu.experimental.embedding.TableConfig
(请参阅相关的 API 文档,了解更多详细信息。)
strategy = tf.distribute.TPUStrategy(cluster_resolver)
with strategy.scope():
if hasattr(tf.keras.optimizers, "legacy"):
optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)
else:
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
dense_input = tf.keras.Input(shape=(2,), dtype=tf.float32, batch_size=global_batch_size)
sparse_input = tf.keras.Input(shape=(), dtype=tf.int32, batch_size=global_batch_size)
embedded_input = tfrs.layers.embedding.TPUEmbedding(
feature_config=tf.tpu.experimental.embedding.FeatureConfig(
table=tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=10,
dim=5,
initializer=tf.initializers.TruncatedNormal(mean=0.0, stddev=1)),
name="sparse_input"),
optimizer=optimizer)(sparse_input)
input = tf.keras.layers.Concatenate(axis=1)([dense_input, embedded_input])
result = tf.keras.layers.Dense(1)(input)
model = tf.keras.Model(inputs={"dense_feature": dense_input, "sparse_feature": sparse_input}, outputs=result)
model.compile(optimizer, "mse", steps_per_execution=10)
这样,您就可以使用训练数据集训练模型了:
model.fit(input_dataset, epochs=5, steps_per_epoch=10)
最后,使用评估数据集评估模型:
model.evaluate(eval_dataset, steps=1, return_dict=True)
后续步骤#
在 API 文档中详细了解如何设置特定于 TPU 的嵌入向量:
tfrs.layers.embedding.TPUEmbedding
:特别介绍了特征和表配置、设置优化器、创建模型(使用 Keras 函数式 API 或通过子类化tf.keras.Model
)、训练/评估以及使用tf.saved_model
应用模型tf.tpu.experimental.embedding.TableConfig
tf.tpu.experimental.embedding.FeatureConfig
要详细了解 TensorFlow 2 中的 TPUStrategy
,请查看以下资源:
指南:使用 TPU(涵盖使用 Keras
Model.fit
进行训练/使用tf.distribute.TPUStrategy
进行自定义训练循环,以及使用tf.function
提升性能的技巧)
要详细了解如何自定义训练,请参阅:
指南:从头开始编写训练循环
TPUs(Google 用于机器学习的专用 ASIC)可通过 Google Colab、TPU Research Cloud 和 Cloud TPU 获得。