DQN C51/Rainbow#

在 TensorFlow.org 上查看 在 Google Colab 运行 在 Github 上查看源代码 下载笔记本

简介#

本示例说明了如何使用 TF-Agents 库在 Cartpole 环境中训练分类 DQN (C51) 代理。

Cartpole environment

确保您已事先阅读 DQN 教程。本教程假定您熟悉 DQN 教程,并主要关注 DQN 与 C51 之间的差异。

设置#

如果尚未安装 TF-Agents,请运行以下命令:

!sudo apt-get update
!sudo apt-get install -y xvfb ffmpeg freeglut3-dev
!pip install 'imageio==2.4.0'
!pip install pyvirtualdisplay
!pip install tf-agents
!pip install pyglet
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import base64
import imageio
import IPython
import matplotlib
import matplotlib.pyplot as plt
import PIL.Image
import pyvirtualdisplay

import tensorflow as tf

from tf_agents.agents.categorical_dqn import categorical_dqn_agent
from tf_agents.drivers import dynamic_step_driver
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.eval import metric_utils
from tf_agents.metrics import tf_metrics
from tf_agents.networks import categorical_q_network
from tf_agents.policies import random_tf_policy
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.utils import common

# Set up a virtual display for rendering OpenAI gym environments.
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

超参数#

env_name = "CartPole-v1" # @param {type:"string"}
num_iterations = 15000 # @param {type:"integer"}

initial_collect_steps = 1000  # @param {type:"integer"} 
collect_steps_per_iteration = 1  # @param {type:"integer"}
replay_buffer_capacity = 100000  # @param {type:"integer"}

fc_layer_params = (100,)

batch_size = 64  # @param {type:"integer"}
learning_rate = 1e-3  # @param {type:"number"}
gamma = 0.99
log_interval = 200  # @param {type:"integer"}

num_atoms = 51  # @param {type:"integer"}
min_q_value = -20  # @param {type:"integer"}
max_q_value = 20  # @param {type:"integer"}
n_step_update = 2  # @param {type:"integer"}

num_eval_episodes = 10  # @param {type:"integer"}
eval_interval = 1000  # @param {type:"integer"}

环境#

像以前一样加载环境,其中一个用于训练,另一个用于评估。在这里,我们使用 CartPole-v1(DQN 教程中则为 CartPole-v0),它的最大奖励是 500,而不是 200。

train_py_env = suite_gym.load(env_name)
eval_py_env = suite_gym.load(env_name)

train_env = tf_py_environment.TFPyEnvironment(train_py_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)

代理#

C51 是一种基于 DQN 的 Q-learning 算法。与 DQN 一样,它可以在具有离散操作空间的任何环境中使用。

C51 与 DQN 之间的主要区别在于,C51 不仅可以简单地预测每个状态-操作对的 Q 值,还能预测表示 Q 值概率分布的直方图模型:

Example C51 Distribution

通过学习分布而不是简单的期望值,此算法能够在训练过程中保持更稳定的状态,从而提高最终性能。这种算法尤其适用于具有双峰甚至多峰值分布的情况,此时单个平均值无法提供准确的概览。

为了基于概率分布而不是值来训练,C51 必须执行一些复杂的分布计算才能计算其损失函数。但不用担心,我们已在 TF-Agents 中为您处理好一切!

要创建 C51 代理,我们首先需要创建一个 CategoricalQNetwork。除了有一个附加参数 num_atoms 外,CategoricalQNetwork 的 API 与 QNetwork 的 API 相同。这表示我们的概率分布估算中的支撑点数。(上面的图像包括 10 个支撑点,每个支撑点都由垂直的蓝色条表示。)您可以从名称中看出,默认原子数为 51。

categorical_q_net = categorical_q_network.CategoricalQNetwork(
    train_env.observation_spec(),
    train_env.action_spec(),
    num_atoms=num_atoms,
    fc_layer_params=fc_layer_params)

我们还需要一个 optimizer 来训练刚刚创建的网络,以及一个 train_step_counter 变量来跟踪网络更新的次数。

请注意,与普通 DqnAgent 的另一个重要区别在于,我们现在需要指定 min_q_valuemax_q_value 作为参数。这两个参数指定了支撑点的最极端值(换句话说,任何一侧有全部 51 个原子)。确保为您的特定环境适当地选择这些值。在这里,我们使用 -20 和 20。

optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)

train_step_counter = tf.Variable(0)

agent = categorical_dqn_agent.CategoricalDqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    categorical_q_network=categorical_q_net,
    optimizer=optimizer,
    min_q_value=min_q_value,
    max_q_value=max_q_value,
    n_step_update=n_step_update,
    td_errors_loss_fn=common.element_wise_squared_loss,
    gamma=gamma,
    train_step_counter=train_step_counter)
agent.initialize()

最后要注意的一点是,我们还添加了一个参数来使用 \(n\) = 2 的 n 步更新。在单步 Q-learning (\(n\) = 1) 中,我们仅使用单步回报(基于贝尔曼最优性方程)计算当前时间步骤和下一时间步骤的 Q 值之间的误差。单步回报定义为:

\(G_t = R_{t + 1} + \gamma V(s_{t + 1})\)

其中,我们定义 \(V(s) = \max_a{Q(s, a)}\)

N 步更新涉及将标准单步回报函数扩展 \(n\) 倍:

\(G_t^n = R_{t + 1} + \gamma R_{t + 2} + \gamma^2 R_{t + 3} + \dots + \gamma^n V(s_{t + n})\)

N 步更新使代理可以在将来进一步自助抽样,而在 \(n\) 值正确的情况下,这通常可以加快学习速度。

尽管 C51 和 n 步更新通常与优先回放相结合构成 Rainbow 代理的核心,但我们发现,实现优先回放并未带来可衡量的改进。此外,我们还发现,仅将 C51 代理与 n 步更新结合使用时,在我们测试过的 Atari 环境样本中,我们的代理在性能上与其他 Rainbow 代理一样出色。

指标和评估#

用于评估策略的最常用指标是平均回报。回报是针对某个片段在环境中运行策略时获得的奖励总和,我们通常会评估多个片段的平均值。计算平均回报指标的代码如下。

#@test {"skip": true}
def compute_avg_return(environment, policy, num_episodes=10):

  total_return = 0.0
  for _ in range(num_episodes):

    time_step = environment.reset()
    episode_return = 0.0

    while not time_step.is_last():
      action_step = policy.action(time_step)
      time_step = environment.step(action_step.action)
      episode_return += time_step.reward
    total_return += episode_return

  avg_return = total_return / num_episodes
  return avg_return.numpy()[0]


random_policy = random_tf_policy.RandomTFPolicy(train_env.time_step_spec(),
                                                train_env.action_spec())

compute_avg_return(eval_env, random_policy, num_eval_episodes)

# Please also see the metrics module for standard implementations of different
# metrics.

数据收集#

与 DQN 教程中一样,使用随机策略设置回放缓冲区和初始数据收集。

#@test {"skip": true}
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=train_env.batch_size,
    max_length=replay_buffer_capacity)

def collect_step(environment, policy):
  time_step = environment.current_time_step()
  action_step = policy.action(time_step)
  next_time_step = environment.step(action_step.action)
  traj = trajectory.from_transition(time_step, action_step, next_time_step)

  # Add trajectory to the replay buffer
  replay_buffer.add_batch(traj)

for _ in range(initial_collect_steps):
  collect_step(train_env, random_policy)

# This loop is so common in RL, that we provide standard implementations of
# these. For more details see the drivers module.

# Dataset generates trajectories with shape [BxTx...] where
# T = n_step_update + 1.
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3, sample_batch_size=batch_size,
    num_steps=n_step_update + 1).prefetch(3)

iterator = iter(dataset)

训练代理#

训练循环包括从环境收集数据和优化代理的网络。在训练过程中,我们偶尔会评估代理的策略来了解效果。

运行以下代码需要约 7 分钟。

#@test {"skip": true}
try:
  %%time
except:
  pass

# (Optional) Optimize by wrapping some of the code in a graph using TF function.
agent.train = common.function(agent.train)

# Reset the train step
agent.train_step_counter.assign(0)

# Evaluate the agent's policy once before training.
avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
returns = [avg_return]

for _ in range(num_iterations):

  # Collect a few steps using collect_policy and save to the replay buffer.
  for _ in range(collect_steps_per_iteration):
    collect_step(train_env, agent.collect_policy)

  # Sample a batch of data from the buffer and update the agent's network.
  experience, unused_info = next(iterator)
  train_loss = agent.train(experience)

  step = agent.train_step_counter.numpy()

  if step % log_interval == 0:
    print('step = {0}: loss = {1}'.format(step, train_loss.loss))

  if step % eval_interval == 0:
    avg_return = compute_avg_return(eval_env, agent.policy, num_eval_episodes)
    print('step = {0}: Average Return = {1:.2f}'.format(step, avg_return))
    returns.append(avg_return)

可视化#

绘图#

我们可以通过绘制回报与全局步骤之间关系的图形来了解代理的性能。在 Cartpole-v1 中,长杆每直立一个时间步骤,环境就会提供 +1 的奖励,由于最大步骤数为 500,因此可以获得的最大回报也是 500。

#@test {"skip": true}

steps = range(0, num_iterations + 1, eval_interval)
plt.plot(steps, returns)
plt.ylabel('Average Return')
plt.xlabel('Step')
plt.ylim(top=550)

视频#

在每个步骤都渲染环境有助于可视化代理的性能。在此之前,我们先创建一个函数,以便在此 Colab 中嵌入视频。

def embed_mp4(filename):
  """Embeds an mp4 file in the notebook."""
  video = open(filename,'rb').read()
  b64 = base64.b64encode(video)
  tag = '''
  <video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4">
  Your browser does not support the video tag.
  </video>'''.format(b64.decode())

  return IPython.display.HTML(tag)

以下代码可将代理策略可视化多个片段:

num_episodes = 3
video_filename = 'imageio.mp4'
with imageio.get_writer(video_filename, fps=60) as video:
  for _ in range(num_episodes):
    time_step = eval_env.reset()
    video.append_data(eval_py_env.render())
    while not time_step.is_last():
      action_step = agent.policy.action(time_step)
      time_step = eval_env.step(action_step.action)
      video.append_data(eval_py_env.render())

embed_mp4(video_filename)

C51 在性能上往往略微优于基于 CartPole-v1 的 DQN,但是,在越来越复杂的环境中,两种代理之间的差异变得越来越明显。例如,在完整的 Atari 2600 基准测试中,针对随机代理进行归一化之后,C51 的平均得分相比 DQN 提高 126%。通过包含 n 步更新,可以进一步提高性能。

要深入了解 C51 算法,请参阅 A Distributional Perspective on Reinforcement Learning (2017)