回放缓冲区#

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 Github 上查看源代码 下载笔记本

简介#

强化学习算法使用回放缓冲区来存储在环境中执行策略时的经历轨迹。在训练过程中,将查询回放缓冲区中的轨迹子集(顺序子集或样本)以“回放”代理的经历。

在本 Colab 中,我们将介绍两种回放缓冲区:Python 支持型和 Tensorflow 支持型,这两种类型采用共同的 API。在以下各部分中,我们将介绍 API、每种缓冲区实现以及如何在数据收集训练期间使用回放缓冲区。

设置#

如果尚未安装 TF-Agents,请先安装。

!pip install tf-agents
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import numpy as np

from tf_agents import specs
from tf_agents.agents.dqn import dqn_agent
from tf_agents.drivers import dynamic_step_driver
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.networks import q_network
from tf_agents.replay_buffers import py_uniform_replay_buffer
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.specs import tensor_spec
from tf_agents.trajectories import time_step

回放缓冲区 API#

回放缓冲区类的定义和方法如下:

class ReplayBuffer(tf.Module):
  """Abstract base class for TF-Agents replay buffer."""

  def __init__(self, data_spec, capacity):
    """Initializes the replay buffer.

    Args:
      data_spec: A spec or a list/tuple/nest of specs describing
        a single item that can be stored in this buffer
      capacity: number of elements that the replay buffer can hold.
    """

  @property
  def data_spec(self):
    """Returns the spec for items in the replay buffer."""

  @property
  def capacity(self):
    """Returns the capacity of the replay buffer."""

  def add_batch(self, items):
    """Adds a batch of items to the replay buffer."""

  def get_next(self,
               sample_batch_size=None,
               num_steps=None,
               time_stacked=True):
    """Returns an item or batch of items from the buffer."""

  def as_dataset(self,
                 sample_batch_size=None,
                 num_steps=None,
                 num_parallel_calls=None):
    """Creates and returns a dataset that returns entries from the buffer."""


  def gather_all(self):
    """Returns all the items in buffer."""
    return self._gather_all()

  def clear(self):
    """Resets the contents of replay buffer"""

请注意,重播缓冲区对象初始化后,需提供待存储元素的 data_spec。此规范与待添加到缓冲区的轨迹元素的 TensorSpec 相对应。通常可以通过查看代理的 agent.collect_data_spec 来获得此规范,其定义了代理在训练时所预期的形状、类型和结构(稍后将详细介绍)

TFUniformReplayBuffer#

TFUniformReplayBuffer 是 TF-Agents 中最常用的回放缓冲区,因此我们将在本教程中予以使用。在 TFUniformReplayBuffer 中,备份缓冲区存储由 Tensorflow 变量实现,因此是计算图的一部分。

缓冲区会成批次地存储元素,每个批次段最大容量为 max_length 个元素。因此,总缓冲区容量为 batch_size x max_length 个元素。缓冲区中存储的元素必须全部具有匹配的数据规范。将回放缓冲区用于数据收集时,该规范为代理的收集数据规范。

创建缓冲区:#

要创建 TFUniformReplayBuffer,我们传入以下内容:

  1. 缓冲区将存储的数据元素的规范

  2. 与缓冲区批次大小对应的 batch size

  3. 每个批次段的元素个数 max_length

在以下创建 TFUniformReplayBuffer 的示例中,采用了示例数据规范,batch_size 为 32,max_length 为 1000。

data_spec =  (
        tf.TensorSpec([3], tf.float32, 'action'),
        (
            tf.TensorSpec([5], tf.float32, 'lidar'),
            tf.TensorSpec([3, 2], tf.float32, 'camera')
        )
)

batch_size = 32
max_length = 1000

replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec,
    batch_size=batch_size,
    max_length=max_length)

写入缓冲区:#

要将元素添加到回放缓冲区,我们使用 add_batch(items) 方法,其中 items 是代表要添加到缓冲区的项目批次的张量的列表/元组/嵌套。items 的每个元素的外部维度必须等于 batch_size,其余维度必须符合项目的数据规范(与传递至回放缓冲区构造函数的数据规范相同)。

以下为添加一批项目的示例

action = tf.constant(1 * np.ones(
    data_spec[0].shape.as_list(), dtype=np.float32))
lidar = tf.constant(
    2 * np.ones(data_spec[1][0].shape.as_list(), dtype=np.float32))
camera = tf.constant(
    3 * np.ones(data_spec[1][1].shape.as_list(), dtype=np.float32))
  
values = (action, (lidar, camera))
values_batched = tf.nest.map_structure(lambda t: tf.stack([t] * batch_size),
                                       values)
  
replay_buffer.add_batch(values_batched)

从缓冲区读取#

有三种方法可以从 TFUniformReplayBuffer 中读取数据:

  1. get_next() - 从缓冲区返回一个样本。通过此方法的参数可以指定返回的样本批次大小和时间步骤数。

  2. as_dataset() - 将回放缓冲区以 tf.data.Dataset 形式返回。然后,用户可以创建数据集迭代器,并在缓冲区中迭代项目样本。

  3. gather_all() - 将缓冲区内的所有项目以形状为 [batch, time, data_spec] 的张量形式返回。

以下示例展示了如何使用这些方法从回放缓冲区读取:

# add more items to the buffer before reading
for _ in range(5):
  replay_buffer.add_batch(values_batched)

# Get one sample from the replay buffer with batch size 10 and 1 timestep:

sample = replay_buffer.get_next(sample_batch_size=10, num_steps=1)

# Convert the replay buffer to a tf.data.Dataset and iterate through it
dataset = replay_buffer.as_dataset(
    sample_batch_size=4,
    num_steps=2)

iterator = iter(dataset)
print("Iterator trajectories:")
trajectories = []
for _ in range(3):
  t, _ = next(iterator)
  trajectories.append(t)
  
print(tf.nest.map_structure(lambda t: t.shape, trajectories))

# Read all elements in the replay buffer:
trajectories = replay_buffer.gather_all()

print("Trajectories from gather all:")
print(tf.nest.map_structure(lambda t: t.shape, trajectories))

PyUniformReplayBuffer#

PyUniformReplayBufferTFUniformReplayBuffer 功能相同,但前者数据存储在 numpy 数组中,而非 TF 变量。该缓冲区可用于非图形式数据收集。对于某些应用而言,numpy 型备份存储无需使用 Tensorflow 变量,数据操作会更为方便(例如,针对更新优先级建立索引)。但是,这种实现方式不具备 Tensorflow 所能提供的图形优化优势。

以下是基于代理的策略轨迹规范实例化 PyUniformReplayBuffer 的示例:

replay_buffer_capacity = 1000*32 # same capacity as the TFUniformReplayBuffer

py_replay_buffer = py_uniform_replay_buffer.PyUniformReplayBuffer(
    capacity=replay_buffer_capacity,
    data_spec=tensor_spec.to_nest_array_spec(data_spec))

在训练过程中使用回放缓冲区#

我们已了解如何创建回放缓冲区,向其写入项目以及从中读取项目,现在可以在代理训练过程中使用回放缓冲区存储轨迹了。

数据收集#

首先,让我们看一下如何在数据收集过程中使用回放缓冲区。

在 TF-Agents 中,我们使用 Driver(有关更多详细信息,请参阅“驱动程序”教程)收集环境中的经历数据。要使用 Driver,我们需要指定 ObserverDriver 在收到轨迹时将执行该函数。

因此,要将轨迹元素添加到重播缓冲区,我们需要添加一个观测函数,其调用 add_batch(items) 以将项目成批次地添加到重播缓冲区内。

以下是使用 TFUniformReplayBuffer 的示例。我们首先创建环境、网络和代理。然后,我们创建 TFUniformReplayBuffer。请注意,回放缓冲区中轨迹元素的规范与代理的收集数据规范相同。然后,我们将其 add_batch 方法设置为将在训练过程中收集数据的驱动程序的观察者:

env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

q_net = q_network.QNetwork(
    tf_env.time_step_spec().observation,
    tf_env.action_spec(),
    fc_layer_params=(100,))

agent = dqn_agent.DqnAgent(
    tf_env.time_step_spec(),
    tf_env.action_spec(),
    q_network=q_net,
    optimizer=tf.compat.v1.train.AdamOptimizer(0.001))

replay_buffer_capacity = 1000

replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    agent.collect_data_spec,
    batch_size=tf_env.batch_size,
    max_length=replay_buffer_capacity)

# Add an observer that adds to the replay buffer:
replay_observer = [replay_buffer.add_batch]

collect_steps_per_iteration = 10
collect_op = dynamic_step_driver.DynamicStepDriver(
  tf_env,
  agent.collect_policy,
  observers=replay_observer,
  num_steps=collect_steps_per_iteration).run()

读取用于训练步骤的数据#

将轨迹元素添加到回放缓冲区后,我们可以从回放缓冲区中批量读取轨迹,用作训练步骤的输入数据。

以下示例展示了在训练循环中如何训练从回放缓冲区中读取的轨迹:

# Read the replay buffer as a Dataset,
# read batches of 4 elements, each with 2 timesteps:
dataset = replay_buffer.as_dataset(
    sample_batch_size=4,
    num_steps=2)

iterator = iter(dataset)

num_train_steps = 10

for _ in range(num_train_steps):
  trajectories, _ = next(iterator)
  loss = agent.train(experience=trajectories)