##### Copyright 2021 The TensorFlow Authors.
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

从 TPU embedding_column 迁移到 TPUEmbedding 层#

在 TensorFlow.org 上查看 在 Google Colab 运行 在 Github 上查看源代码 下载笔记本

本指南演示了如何将 TPU 上的嵌入向量训练从 TensorFlow 1 带有 TPUEstimatorembedding_column API 迁移到 TensorFlow 2 带有 TPUStrategyTPUEmbedding 层 API。

嵌入向量是(大型)矩阵。它们是从稀疏特征空间映射到密集向量的查找表。嵌入向量提供高效和密集表示,可以捕捉特征之间复杂的相似度和关系。

TensorFlow 包括对在 TPU 上训练嵌入向量的专门支持。这种特定于 TPU 的嵌入向量支持可让您训练大于单个 TPU 设备内存的嵌入向量,并在 TPU 上使用稀疏和不规则输入。

  • 在 TensorFlow 1 中,tf.compat.v1.estimator.tpu.TPUEstimator 是一个高级 API,它封装了适用于 TPU 的训练、评估、预测和导出。它对 tf.compat.v1.tpu.experimental.embedding_column 有特殊支持。

  • 要在 TensorFlow 2 中实现此目标,可以使用 TensorFlow Recommenders 的 tfrs.layers.embedding.TPUEmbedding 层。对于训练和评估,可以使用 TPU 分布策略 tf.distribute.TPUStrategy,例如,它与用于模型构建 (tf.keras.Model)、优化器 (tf.keras.optimizers.Optimizer) 的 Keras API 兼容,支持使用 Model.fit 或者自定义训练循环(使用 tf.functiontf.GradientTape)进行训练。

有关详情,请参阅 tfrs.layers.embedding.TPUEmbedding 层的 API 文档,以及 tf.tpu.experimental.embedding.TableConfigtf.tpu.experimental.embedding.FeatureConfig 文档。有关 tf.distribute.TPUStrategy 的概述,请查看分布式训练指南和使用 TPU 指南。如果您要从 TPUEstimator 迁移到 TPUStrategy,请查看 TPU 迁移指南

安装#

首先,安装 TensorFlow Recommenders 并导入一些必要的软件包:

!pip install tensorflow-recommenders
import tensorflow as tf
import tensorflow.compat.v1 as tf1

# TPUEmbedding layer is not part of TensorFlow.
import tensorflow_recommenders as tfrs

然后,准备一个用于演示目的的简单数据集:

features = [[1., 1.5]]
embedding_features_indices = [[0, 0], [0, 1]]
embedding_features_values = [0, 5]
labels = [[0.3]]
eval_features = [[4., 4.5]]
eval_embedding_features_indices = [[0, 0], [0, 1]]
eval_embedding_features_values = [4, 3]
eval_labels = [[0.8]]

TensorFlow 1:使用 TPUEstimator 在 TPU 上训练嵌入#

在 TensorFlow 1 中,使用 tf.compat.v1.tpu.experimental.embedding_column API 设置 TPU 嵌入向量,并使用 tf.compat.v1.estimator.tpu.TPUEstimator 在 TPU 上训练/评估模型。

输入是整数,范围从零到 TPU 嵌入向量表的词汇量大小。首先使用 tf.feature_column.categorical_column_with_identity 将输入编码为分类 ID。将 "sparse_feature" 用于 key 参数,因为输入特征是整数值,而 num_buckets 是嵌入向量表的词汇量大小 (10)。

embedding_id_column = (
      tf1.feature_column.categorical_column_with_identity(
          key="sparse_feature", num_buckets=10))

接下来,使用 tpu.experimental.embedding_column 将稀疏分类输入转换为密集表示,其中 dimension 是嵌入向量表的宽度。它将为每个 num_buckets 存储一个嵌入向量。

embedding_column = tf1.tpu.experimental.embedding_column(
    embedding_id_column, dimension=5)

现在,通过 tf.estimator.tpu.experimental.EmbeddingConfigSpec 定义特定于 TPU 的嵌入向量配置。稍后,您会将其作为 embedding_config_spec 参数传递给 tf.estimator.tpu.TPUEstimator

embedding_config_spec = tf1.estimator.tpu.experimental.EmbeddingConfigSpec(
    feature_columns=(embedding_column,),
    optimization_parameters=(
        tf1.tpu.experimental.AdagradParameters(0.05)))

接下来,要使用 TPUEstimator,请定义:

  • 用于训练数据的输入函数

  • 用于评估数据的评估输入函数

  • 用于指示 TPUEstimator 如何使用特征和标签定义训练运算的模型函数

def _input_fn(params):
  dataset = tf1.data.Dataset.from_tensor_slices((
      {"dense_feature": features,
       "sparse_feature": tf1.SparseTensor(
           embedding_features_indices,
           embedding_features_values, [1, 2])},
           labels))
  dataset = dataset.repeat()
  return dataset.batch(params['batch_size'], drop_remainder=True)

def _eval_input_fn(params):
  dataset = tf1.data.Dataset.from_tensor_slices((
      {"dense_feature": eval_features,
       "sparse_feature": tf1.SparseTensor(
           eval_embedding_features_indices,
           eval_embedding_features_values, [1, 2])},
           eval_labels))
  dataset = dataset.repeat()
  return dataset.batch(params['batch_size'], drop_remainder=True)

def _model_fn(features, labels, mode, params):
  embedding_features = tf1.keras.layers.DenseFeatures(embedding_column)(features)
  concatenated_features = tf1.keras.layers.Concatenate(axis=1)(
      [embedding_features, features["dense_feature"]])
  logits = tf1.layers.Dense(1)(concatenated_features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  optimizer = tf1.tpu.CrossShardOptimizer(optimizer)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

定义这些函数后,创建一个提供聚簇信息的 tf.distribute.cluster_resolver.TPUClusterResolver 和一个 tf.compat.v1.estimator.tpu.RunConfig 对象。

连同您已定义的模型函数,现在您可以创建一个 TPUEstimator。在这里,您将通过跳过检查点保存来简化流程。随后,您将为 TPUEstimator 的训练和评估指定批次大小。

cluster_resolver = tf1.distribute.cluster_resolver.TPUClusterResolver(tpu='')
print("All devices: ", tf1.config.list_logical_devices('TPU'))
tpu_config = tf1.estimator.tpu.TPUConfig(
    iterations_per_loop=10,
    per_host_input_for_training=tf1.estimator.tpu.InputPipelineConfig
          .PER_HOST_V2)
config = tf1.estimator.tpu.RunConfig(
    cluster=cluster_resolver,
    save_checkpoints_steps=None,
    tpu_config=tpu_config)
estimator = tf1.estimator.tpu.TPUEstimator(
    model_fn=_model_fn, config=config, train_batch_size=8, eval_batch_size=8,
    embedding_config_spec=embedding_config_spec)

调用 TPUEstimator.train 以开始训练模型:

estimator.train(_input_fn, steps=1)

然后,调用 TPUEstimator.evaluate 以使用评估数据评估模型:

estimator.evaluate(_eval_input_fn, steps=1)

TensorFlow 2:使用 TPUStrategy 在 TPU 上训练嵌入向量#

在 TensorFlow 2 中,要在 TPU 工作进程上进行训练,请将 tf.distribute.TPUStrategy 与 Keras API 一起用于模型定义和训练/评估。(有关使用 Keras Model.fit 和自定义训练循环(使用 tf.functiontf.GradientTape)进行训练的更多示例,请参阅使用 TPU 指南。)

您需要执行一些初始化工作以连接到远程聚簇并初始化 TPU 工作进程,因此首先创建一个 TPUClusterResolver 以提供聚簇信息并连接到聚簇。(在使用 TPU 指南的 TPU 初始化部分中了解详情。)

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))

接下来,准备您的数据。这类似于您在 TensorFlow 1 示例中创建数据集的方式,不同之处在于数据集函数现在传递一个 tf.distribute.InputContext 对象而不是 params 字典。可以使用此对象来确定本地批次大小(以及此流水线用于哪个主机,以便可以正确对数据分区)。

  • 使用 tfrs.layers.embedding.TPUEmbedding API 时,务必在使用 Dataset.batch 对数据集进行批处理时包含 drop_remainder=True 选项,因为 TPUEmbedding 需要固定的批批次大小。

  • 此外,如果在同一组设备上进行评估和训练,则必须使用相同的批次大小。

  • 最后,您应当将 tf.keras.utils.experimental.DatasetCreatortf.distribute.InputOptions 中的特殊输入选项 experimental_fetch_to_device=False(保存特定于策略的配置)一起使用。下面的代码对此进行了演示:

global_batch_size = 8

def _input_dataset(context: tf.distribute.InputContext):
  dataset = tf.data.Dataset.from_tensor_slices((
      {"dense_feature": features,
       "sparse_feature": tf.SparseTensor(
           embedding_features_indices,
           embedding_features_values, [1, 2])},
           labels))
  dataset = dataset.shuffle(10).repeat()
  dataset = dataset.batch(
      context.get_per_replica_batch_size(global_batch_size),
      drop_remainder=True)
  return dataset.prefetch(2)

def _eval_dataset(context: tf.distribute.InputContext):
  dataset = tf.data.Dataset.from_tensor_slices((
      {"dense_feature": eval_features,
       "sparse_feature": tf.SparseTensor(
           eval_embedding_features_indices,
           eval_embedding_features_values, [1, 2])},
           eval_labels))
  dataset = dataset.repeat()
  dataset = dataset.batch(
      context.get_per_replica_batch_size(global_batch_size),
      drop_remainder=True)
  return dataset.prefetch(2)

input_options = tf.distribute.InputOptions(
    experimental_fetch_to_device=False)

input_dataset = tf.keras.utils.experimental.DatasetCreator(
    _input_dataset, input_options=input_options)

eval_dataset = tf.keras.utils.experimental.DatasetCreator(
    _eval_dataset, input_options=input_options)

接下来,准备好数据后,将创建一个 TPUStrategy,然后在此策略的范围 (Strategy.scope) 下定义模型、指标和优化器。

您应当为 Model.compile 中的 steps_per_execution 选择一个数字,因为它指定了每次 tf.function 调用期间要运行的批次数,并且对性能至关重要。此参数类似于 TPUEstimator 中使用的 iterations_per_loop

TensorFlow 1 中通过 tf.tpu.experimental.embedding_column(和 tf.tpu.experimental.shared_embedding_column)指定的特征和表配置可以通过一对配置对象在 TensorFlow 2 中直接指定:

  • tf.tpu.experimental.embedding.FeatureConfig

  • tf.tpu.experimental.embedding.TableConfig

(请参阅相关的 API 文档,了解更多详细信息。)

strategy = tf.distribute.TPUStrategy(cluster_resolver)
with strategy.scope():
  if hasattr(tf.keras.optimizers, "legacy"):
    optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)
  else:
    optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  dense_input = tf.keras.Input(shape=(2,), dtype=tf.float32, batch_size=global_batch_size)
  sparse_input = tf.keras.Input(shape=(), dtype=tf.int32, batch_size=global_batch_size)
  embedded_input = tfrs.layers.embedding.TPUEmbedding(
      feature_config=tf.tpu.experimental.embedding.FeatureConfig(
          table=tf.tpu.experimental.embedding.TableConfig(
              vocabulary_size=10,
              dim=5,
              initializer=tf.initializers.TruncatedNormal(mean=0.0, stddev=1)),
          name="sparse_input"),
      optimizer=optimizer)(sparse_input)
  input = tf.keras.layers.Concatenate(axis=1)([dense_input, embedded_input])
  result = tf.keras.layers.Dense(1)(input)
  model = tf.keras.Model(inputs={"dense_feature": dense_input, "sparse_feature": sparse_input}, outputs=result)
  model.compile(optimizer, "mse", steps_per_execution=10)

这样,您就可以使用训练数据集训练模型了:

model.fit(input_dataset, epochs=5, steps_per_epoch=10)

最后,使用评估数据集评估模型:

model.evaluate(eval_dataset, steps=1, return_dict=True)

后续步骤#

在 API 文档中详细了解如何设置特定于 TPU 的嵌入向量:

  • tfrs.layers.embedding.TPUEmbedding:特别介绍了特征和表配置、设置优化器、创建模型(使用 Keras 函数式 API 或通过子类化 tf.keras.Model)、训练/评估以及使用 tf.saved_model 应用模型

  • tf.tpu.experimental.embedding.TableConfig

  • tf.tpu.experimental.embedding.FeatureConfig

要详细了解 TensorFlow 2 中的 TPUStrategy,请查看以下资源:

要详细了解如何自定义训练,请参阅:

TPUs(Google 用于机器学习的专用 ASIC)可通过 Google ColabTPU Research CloudCloud TPU 获得。