TF-Agents 中的排名教程#
开始#
在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
安装#
!pip install tf-agents[reverb]
#@title Imports
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tf_agents.bandits.agents import ranking_agent
from tf_agents.bandits.agents.examples.v2 import trainer
from tf_agents.bandits.environments import ranking_environment
from tf_agents.bandits.networks import global_and_arm_feature_network
from tf_agents.environments import tf_py_environment
from tf_agents.bandits.policies import ranking_policy
from tf_agents.bandits.replay_buffers import bandit_replay_buffer
from tf_agents.drivers import dynamic_step_driver
from tf_agents.metrics import tf_metrics
from tf_agents.specs import bandit_spec_utils
from tf_agents.specs import tensor_spec
from tf_agents.trajectories import trajectory
简介#
在本教程中,我们将指导您了解作为 TF-Agents Bandits 库的一部分实现的排名算法。在排名问题中,对于每次迭代,代理都会获得一组物品,并负责将其中的部分或全部排序到一个列表中。然后,此排名决策会收到某种形式的反馈(例如,用户可能点击或不点击一个或多个所选物品)。代理的目标是优化一些指标/奖励,目标是随着时间的推移做出更好的决策。
前提条件#
TF-Agents 中的排名算法属于一种特殊类型的老虎机代理,它针对“单臂”老虎机问题进行运算。因此,为了能够从本教程中获益最多,读者应当熟悉老虎机和单臂老虎机教程。
排名问题及其变体#
在本教程中,我们将使用向用户展示待售物品的示例。在每次迭代中,我们都会收到一组物品,可能还有一个描述我们应该显示多少物品的数字。我们假设手头的物品数量总是大于或等于放置它们的插槽的数量。我们需要填充显示中的各个插槽以最大化用户与一个或多个显示的物品交互的概率。用户和物品均由特征描述。
如果我们设法展示用户喜欢的物品,用户/物品交互的可能性就会提高。因此,了解用户-物品对如何匹配是一个好想法。但是我们怎么知道一个物品是否被用户喜欢呢?为此,我们引入了反馈类型。
#反馈类型
与反馈信号(奖励)与单个选定物品直接相关的老虎机问题相反,在排名中,我们需要考虑反馈如何转化为所显示物品的“优点”。换句话说,我们需要为显示的所有或部分物品分配分数。在我们的库中,我们提供两种不同的反馈类型:向量反馈和级联反馈。
向量反馈#
在向量反馈类型中,我们假设代理收到输出排名中每个物品的标量分数。这些标量以与输出排名相同的顺序置于一个向量中。因此,反馈是一个与排名中的元素数量相同大小的向量。
这种反馈类型非常简单,因为我们不需要担心将反馈信号转换为分数的问题。另一方面,对物品进行评分的责任落在了设计者(也就是您)身上:由系统设计者根据物品、它的位置以及它是否与用户交互来决定给出什么分数。
##级联反馈
在级联反馈类型(由 Craswell 等人于 2008 年创造的术语)中,我们假设用户从顶部插槽开始按顺序查看显示的物品。一旦用户发现一个值得点击的物品,他们就会点击并且永远不会返回到当前排名列表。他们甚至不查看所点击物品下面的物品。不点击任何物品也是可能的,当显示的物品都不值得点击时会发生这种情况。在这种情况下,用户确实查看了所有物品。
反馈信号由两个元素组成:所选元素的索引和点击的值。随后将这些信息转换为分数是代理的任务。在我们 bandit 库的实现中,我们实现了已查看但未点击的物品会获得一些低分(通常为 0 或 -1)的约定,被点击的物品会获得点击值,而未被点击的物品会被代理忽略。
与探索#
为了最大程度提高用户点击某个物品的几率,仅仅选择得分最高的物品并将它们放在排名靠前的地方是不够的。对于一个拥有很多不同兴趣的用户来说,他们可能对运动最感兴趣,但他们也喜欢艺术和旅行。给所有运动物品最高估计分数并在最高插槽显示所有运动物品可能不是最佳做法。用户可能对艺术或旅行感兴趣。因此,显示高分兴趣的组合是一个好想法。重要的是,不仅要最大程度提高所显示物品的分数,还要确保它们形成一个多元化的集合。
与其他信息有限的学习问题(如老虎机)一样,同样重要的一点是,要牢记我们的决策不仅会影响即时奖励,还会影响训练数据和未来奖励。如果我们总是只根据它们当前的估计分数来显示物品,我们可能会错过我们尚未充分探索的高分物品,因此我们不知道它们有多出色。也就是说,我们需要将探索纳入我们的决策过程。
所有上述概念和考虑因素都会在我们的库中得到解决。在本教程中,我们将向您介绍详细信息。
模拟用户:我们的测试环境#
我们来深入了解我们的代码库!
首先,我们定义环境,负责随机生成用户和物品特征的类,并在决策后给出反馈。
feedback_model = ranking_environment.FeedbackModel.CASCADING #@param["ranking_environment.FeedbackModel.SCORE_VECTOR", "ranking_environment.FeedbackModel.CASCADING"] {type:"raw"}
我们还需要一个环境模型来决定何时不点击。我们的库中有基于距离和幽灵动作两种方法。
在基于距离的模型中,如果用户特征与任何物品特征的接近程度都不足,则用户不会点击。
在幽灵动作模型中,我们以单位向量物品特征的形式设置了额外的想象动作。如果用户选择了其中一个幽灵动作,则会导致无点击。
click_type = "ghost_actions" #@param["distance_based", "ghost_actions"]
click_model = (ranking_environment.ClickModel.DISTANCE_BASED
if click_type == "distance_based" else
ranking_environment.ClickModel.GHOST_ACTIONS)
我们几乎已经准备好定义排名环境,只需一些准备工作:我们为全局(用户)和物品特征定义采样函数。这些特征将被环境用来模拟用户行为:计算全局和物品特征的加权内积,用户点击的概率与内积值成正比。内积的权重由下面的 scores_weight_matrix
定义。
global_dim = 9 #@param{ type: "integer"}
item_dim = 11 #@param{ type: "integer"}
num_items = 50 #@param{ type: "integer"}
num_slots = 3 #@param{ type: "integer"}
distance_threshold = 5.0 #@param{ type: "number" }
batch_size = 128 #@param{ type: "integer"}
def global_sampling_fn():
return np.random.randint(-1, 1, [global_dim]).astype(np.float32)
def item_sampling_fn():
return np.random.randint(-2, 3, [item_dim]).astype(np.float32)
# Inner product with excess dimensions ignored.
scores_weight_matrix = np.eye(11, 9, dtype=np.float32)
env = ranking_environment.RankingPyEnvironment(
global_sampling_fn,
item_sampling_fn,
num_items=num_items,
num_slots=num_slots,
scores_weight_matrix=scores_weight_matrix,
feedback_model=feedback_model,
click_model=click_model,
distance_threshold=distance_threshold,
batch_size=batch_size)
# Convert the python environment to tf environment.
environment = tf_py_environment.TFPyEnvironment(env)
现在,让我们定义几个不同的代理来处理上述环境!所有代理都训练一个估计物品/用户对分数的网络。不同之处在于策略,即训练好的网络如何用于做出排名决策。实现的策略涵盖了从仅基于分数的堆栈排名到考虑多元化和探索并能够调整这些方面的混合。
#@title Defining the Network and Training Params
scoring_network = (
global_and_arm_feature_network.create_feed_forward_common_tower_network(
environment.observation_spec(), (20, 10), (20, 10), (20, 10)))
learning_rate = 0.005 #@param{ type: "number"}
feedback_dict = {ranking_environment.FeedbackModel.CASCADING: ranking_agent.FeedbackModel.CASCADING,
ranking_environment.FeedbackModel.SCORE_VECTOR: ranking_agent.FeedbackModel.SCORE_VECTOR}
agent_feedback_model = feedback_dict[feedback_model]
#@title Stack Ranking Deterministically by Scores
policy_type = ranking_agent.RankingPolicyType.DESCENDING_SCORES
descending_scores_agent = ranking_agent.RankingAgent(
time_step_spec=environment.time_step_spec(),
action_spec=environment.action_spec(),
scoring_network=scoring_network,
optimizer=tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate),
feedback_model=agent_feedback_model,
policy_type=policy_type,
summarize_grads_and_vars=True)
#@title Sampling Sequentially Based on Scores
policy_type = ranking_agent.RankingPolicyType.NO_PENALTY
logits_temperature = 1.0 #@param{ type: "number" }
no_penalty_agent = ranking_agent.RankingAgent(
time_step_spec=environment.time_step_spec(),
action_spec=environment.action_spec(),
scoring_network=scoring_network,
optimizer=tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate),
feedback_model=agent_feedback_model,
policy_type=policy_type,
logits_temperature=logits_temperature,
summarize_grads_and_vars=True)
#@title Sampling Sequentally and Taking Diversity into Account
#@markdown The balance between ranking based on scores and taking diversity into account is governed by the following "penalty mixture" parameter. A low positive value results in rankings that hardly mix in diversity, a higher value will enforce more diversity.
policy_type = ranking_agent.RankingPolicyType.COSINE_DISTANCE
penalty_mixture = 1.0 #@param{ type: "number"}
cosine_distance_agent = ranking_agent.RankingAgent(
time_step_spec=environment.time_step_spec(),
action_spec=environment.action_spec(),
scoring_network=scoring_network,
optimizer=tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate),
feedback_model=agent_feedback_model,
policy_type=policy_type,
logits_temperature=logits_temperature,
penalty_mixture_coefficient=penalty_mixture,
summarize_grads_and_vars=True)
#@title Choosing the desired agent.
agent_type = "cosine_distance_agent" #@param["cosine_distance_agent", "no_penalty_agent", "descending_scores_agent"]
if agent_type == "descending_scores_agent":
agent = descending_scores_agent
elif agent_type == "no_penalty_agent":
agent = no_penalty_agent
else:
agent = cosine_distance_agent
在我们开始训练循环之前,我们还需要考虑训练数据的问题。
在决策时呈现给策略的臂特征包含策略可以选择的所有物品。但是,在训练时,我们需要被选择物品的特征,而且为了方便起见,这些特征按照决策输出的顺序排列。为此,使用了以下函数(为清楚起见,从此处复制)。
def order_items_from_action_fn(orig_trajectory):
"""Puts the features of the selected items in the recommendation order.
This function is used to make sure that at training the item observation is
filled with features of items selected by the policy, in the order of the
selection. Features of unselected items are discarded.
Args:
orig_trajectory: The trajectory as output by the policy
Returns:
The modified trajectory that contains slotted item features.
"""
item_obs = orig_trajectory.observation[
bandit_spec_utils.PER_ARM_FEATURE_KEY]
action = orig_trajectory.action
if isinstance(
orig_trajectory.observation[bandit_spec_utils.PER_ARM_FEATURE_KEY],
tensor_spec.TensorSpec):
dtype = orig_trajectory.observation[
bandit_spec_utils.PER_ARM_FEATURE_KEY].dtype
shape = [
num_slots, orig_trajectory.observation[
bandit_spec_utils.PER_ARM_FEATURE_KEY].shape[-1]
]
new_observation = {
bandit_spec_utils.GLOBAL_FEATURE_KEY:
orig_trajectory.observation[bandit_spec_utils.GLOBAL_FEATURE_KEY],
bandit_spec_utils.PER_ARM_FEATURE_KEY:
tensor_spec.TensorSpec(dtype=dtype, shape=shape)
}
else:
slotted_items = tf.gather(item_obs, action, batch_dims=1)
new_observation = {
bandit_spec_utils.GLOBAL_FEATURE_KEY:
orig_trajectory.observation[bandit_spec_utils.GLOBAL_FEATURE_KEY],
bandit_spec_utils.PER_ARM_FEATURE_KEY:
slotted_items
}
return trajectory.Trajectory(
step_type=orig_trajectory.step_type,
observation=new_observation,
action=(),
policy_info=(),
next_step_type=orig_trajectory.next_step_type,
reward=orig_trajectory.reward,
discount=orig_trajectory.discount)
#@title Defininfing Parameters to Run the Agent on the Defined Environment
num_iterations = 400 #@param{ type: "number" }
steps_per_loop = 2 #@param{ type: "integer" }
就像在老虎机教程中一样,我们定义了回放缓冲区,它将为代理提供要训练的样本。然后,我们使用驱动程序将所有内容放在一起:环境提供特征,策略选择排名,以及收集样本进行训练。
replay_buffer = bandit_replay_buffer.BanditReplayBuffer(
data_spec=order_items_from_action_fn(agent.policy.trajectory_spec),
batch_size=batch_size,
max_length=steps_per_loop)
if feedback_model == ranking_environment.FeedbackModel.SCORE_VECTOR:
reward_metric = tf_metrics.AverageReturnMetric(
batch_size=environment.batch_size,
buffer_size=200)
else:
reward_metric = tf_metrics.AverageReturnMultiMetric(
reward_spec=environment.reward_spec(),
batch_size=environment.batch_size,
buffer_size=200)
add_batch_fn = lambda data: replay_buffer.add_batch(
order_items_from_action_fn(data))
observers = [add_batch_fn, reward_metric]
driver = dynamic_step_driver.DynamicStepDriver(
env=environment,
policy=agent.collect_policy,
num_steps=steps_per_loop * batch_size,
observers=observers)
reward_values = []
for _ in range(num_iterations):
driver.run()
loss_info = agent.train(replay_buffer.gather_all())
replay_buffer.clear()
if feedback_model == ranking_environment.FeedbackModel.SCORE_VECTOR:
reward_values.append(reward_metric.result())
else:
reward_values.append(reward_metric.result())
我们把奖励绘制出来!
if feedback_model == ranking_environment.FeedbackModel.SCORE_VECTOR:
reward = reward_values
else:
reward = [r["chosen_value"] for r in reward_values]
plt.plot(reward)
plt.ylabel('Average Return')
plt.xlabel('Number of Iterations')
后续步骤#
本教程有许多可调参数,包括要使用的策略/代理、环境的一些属性,甚至反馈模型。请随意用这些参数进行实验!
此外,tf_agents/bandits/agents/examples/v2/train_eval_ranking.py
中还有一个随时可运行的排名示例