{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "beObUOFyuRjT" }, "source": [ "##### Copyright 2023 The TF-Agents Authors." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "nQnmcm0oI1Q-" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "eutDVTs9aJEL" }, "source": [ "# 回放缓冲区\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 TensorFlow.org 上查看 在 Google Colab 中运行 在 Github 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "8aPHF9kXFggA" }, "source": [ "## 简介\n", "\n", "强化学习算法使用回放缓冲区来存储在环境中执行策略时的经历轨迹。在训练过程中,将查询回放缓冲区中的轨迹子集(顺序子集或样本)以“回放”代理的经历。\n", "\n", "在本 Colab 中,我们将介绍两种回放缓冲区:Python 支持型和 Tensorflow 支持型,这两种类型采用共同的 API。在以下各部分中,我们将介绍 API、每种缓冲区实现以及如何在数据收集训练期间使用回放缓冲区。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "1uSlqYgvaG9b" }, "source": [ "## 设置" ] }, { "cell_type": "markdown", "metadata": { "id": "GztmUpWKZ7kq" }, "source": [ "如果尚未安装 TF-Agents,请先安装。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "TnE2CgilrngG" }, "outputs": [], "source": [ "!pip install tf-agents\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "whYNP894FSkA" }, "outputs": [], "source": [ "from __future__ import absolute_import\n", "from __future__ import division\n", "from __future__ import print_function\n", "\n", "import tensorflow as tf\n", "import numpy as np\n", "\n", "from tf_agents import specs\n", "from tf_agents.agents.dqn import dqn_agent\n", "from tf_agents.drivers import dynamic_step_driver\n", "from tf_agents.environments import suite_gym\n", "from tf_agents.environments import tf_py_environment\n", "from tf_agents.networks import q_network\n", "from tf_agents.replay_buffers import py_uniform_replay_buffer\n", "from tf_agents.replay_buffers import tf_uniform_replay_buffer\n", "from tf_agents.specs import tensor_spec\n", "from tf_agents.trajectories import time_step" ] }, { "cell_type": "markdown", "metadata": { "id": "xcQWclL9FpZl" }, "source": [ "## 回放缓冲区 API\n", "\n", "回放缓冲区类的定义和方法如下:\n", "\n", "```python\n", "class ReplayBuffer(tf.Module):\n", " \"\"\"Abstract base class for TF-Agents replay buffer.\"\"\"\n", "\n", " def __init__(self, data_spec, capacity):\n", " \"\"\"Initializes the replay buffer.\n", "\n", " Args:\n", " data_spec: A spec or a list/tuple/nest of specs describing\n", " a single item that can be stored in this buffer\n", " capacity: number of elements that the replay buffer can hold.\n", " \"\"\"\n", "\n", " @property\n", " def data_spec(self):\n", " \"\"\"Returns the spec for items in the replay buffer.\"\"\"\n", "\n", " @property\n", " def capacity(self):\n", " \"\"\"Returns the capacity of the replay buffer.\"\"\"\n", "\n", " def add_batch(self, items):\n", " \"\"\"Adds a batch of items to the replay buffer.\"\"\"\n", "\n", " def get_next(self,\n", " sample_batch_size=None,\n", " num_steps=None,\n", " time_stacked=True):\n", " \"\"\"Returns an item or batch of items from the buffer.\"\"\"\n", "\n", " def as_dataset(self,\n", " sample_batch_size=None,\n", " num_steps=None,\n", " num_parallel_calls=None):\n", " \"\"\"Creates and returns a dataset that returns entries from the buffer.\"\"\"\n", "\n", "\n", " def gather_all(self):\n", " \"\"\"Returns all the items in buffer.\"\"\"\n", " return self._gather_all()\n", "\n", " def clear(self):\n", " \"\"\"Resets the contents of replay buffer\"\"\"\n", "\n", "```\n", "\n", "请注意,重播缓冲区对象初始化后,需提供待存储元素的 `data_spec`。此规范与待添加到缓冲区的轨迹元素的 `TensorSpec` 相对应。通常可以通过查看代理的 `agent.collect_data_spec` 来获得此规范,其定义了代理在训练时所预期的形状、类型和结构(稍后将详细介绍)" ] }, { "cell_type": "markdown", "metadata": { "id": "X3Yrxg36Ik1x" }, "source": [ "## TFUniformReplayBuffer\n", "\n", "`TFUniformReplayBuffer` 是 TF-Agents 中最常用的回放缓冲区,因此我们将在本教程中予以使用。在 `TFUniformReplayBuffer` 中,备份缓冲区存储由 Tensorflow 变量实现,因此是计算图的一部分。\n", "\n", "缓冲区会成批次地存储元素,每个批次段最大容量为 `max_length` 个元素。因此,总缓冲区容量为 `batch_size` x `max_length` 个元素。缓冲区中存储的元素必须全部具有匹配的数据规范。将回放缓冲区用于数据收集时,该规范为代理的收集数据规范。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "lYk-bn2taXlw" }, "source": [ "### 创建缓冲区:\n", "\n", "要创建 `TFUniformReplayBuffer`,我们传入以下内容:\n", "\n", "1. 缓冲区将存储的数据元素的规范\n", "2. 与缓冲区批次大小对应的 `batch size`\n", "3. 每个批次段的元素个数 `max_length`\n", "\n", "在以下创建 `TFUniformReplayBuffer` 的示例中,采用了示例数据规范,`batch_size` 为 32,`max_length` 为 1000。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "Dj4_-77_5ExP" }, "outputs": [], "source": [ "data_spec = (\n", " tf.TensorSpec([3], tf.float32, 'action'),\n", " (\n", " tf.TensorSpec([5], tf.float32, 'lidar'),\n", " tf.TensorSpec([3, 2], tf.float32, 'camera')\n", " )\n", ")\n", "\n", "batch_size = 32\n", "max_length = 1000\n", "\n", "replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(\n", " data_spec,\n", " batch_size=batch_size,\n", " max_length=max_length)" ] }, { "cell_type": "markdown", "metadata": { "id": "XB8rOw5ATDD2" }, "source": [ "### 写入缓冲区:\n", "\n", "要将元素添加到回放缓冲区,我们使用 `add_batch(items)` 方法,其中 `items` 是代表要添加到缓冲区的项目批次的张量的列表/元组/嵌套。`items` 的每个元素的外部维度必须等于 `batch_size`,其余维度必须符合项目的数据规范(与传递至回放缓冲区构造函数的数据规范相同)。\n", "\n", "以下为添加一批项目的示例\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "nOvkp4vJhBOT" }, "outputs": [], "source": [ "action = tf.constant(1 * np.ones(\n", " data_spec[0].shape.as_list(), dtype=np.float32))\n", "lidar = tf.constant(\n", " 2 * np.ones(data_spec[1][0].shape.as_list(), dtype=np.float32))\n", "camera = tf.constant(\n", " 3 * np.ones(data_spec[1][1].shape.as_list(), dtype=np.float32))\n", " \n", "values = (action, (lidar, camera))\n", "values_batched = tf.nest.map_structure(lambda t: tf.stack([t] * batch_size),\n", " values)\n", " \n", "replay_buffer.add_batch(values_batched)" ] }, { "cell_type": "markdown", "metadata": { "id": "smnVAxHghKly" }, "source": [ "### 从缓冲区读取\n", "\n", "有三种方法可以从 `TFUniformReplayBuffer` 中读取数据:\n", "\n", "1. `get_next()` - 从缓冲区返回一个样本。通过此方法的参数可以指定返回的样本批次大小和时间步骤数。\n", "2. `as_dataset()` - 将回放缓冲区以 `tf.data.Dataset` 形式返回。然后,用户可以创建数据集迭代器,并在缓冲区中迭代项目样本。\n", "3. `gather_all()` - 将缓冲区内的所有项目以形状为 `[batch, time, data_spec]` 的张量形式返回。\n", "\n", "以下示例展示了如何使用这些方法从回放缓冲区读取:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "IlQ1eGhohM3M" }, "outputs": [], "source": [ "# add more items to the buffer before reading\n", "for _ in range(5):\n", " replay_buffer.add_batch(values_batched)\n", "\n", "# Get one sample from the replay buffer with batch size 10 and 1 timestep:\n", "\n", "sample = replay_buffer.get_next(sample_batch_size=10, num_steps=1)\n", "\n", "# Convert the replay buffer to a tf.data.Dataset and iterate through it\n", "dataset = replay_buffer.as_dataset(\n", " sample_batch_size=4,\n", " num_steps=2)\n", "\n", "iterator = iter(dataset)\n", "print(\"Iterator trajectories:\")\n", "trajectories = []\n", "for _ in range(3):\n", " t, _ = next(iterator)\n", " trajectories.append(t)\n", " \n", "print(tf.nest.map_structure(lambda t: t.shape, trajectories))\n", "\n", "# Read all elements in the replay buffer:\n", "trajectories = replay_buffer.gather_all()\n", "\n", "print(\"Trajectories from gather all:\")\n", "print(tf.nest.map_structure(lambda t: t.shape, trajectories))\n" ] }, { "cell_type": "markdown", "metadata": { "id": "BcS49HrNF34W" }, "source": [ "## PyUniformReplayBuffer\n", "\n", "`PyUniformReplayBuffer` 与 `TFUniformReplayBuffer` 功能相同,但前者数据存储在 numpy 数组中,而非 TF 变量。该缓冲区可用于非图形式数据收集。对于某些应用而言,numpy 型备份存储无需使用 Tensorflow 变量,数据操作会更为方便(例如,针对更新优先级建立索引)。但是,这种实现方式不具备 Tensorflow 所能提供的图形优化优势。\n", "\n", "以下是基于代理的策略轨迹规范实例化 `PyUniformReplayBuffer` 的示例:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "F4neLPpL25wI" }, "outputs": [], "source": [ "replay_buffer_capacity = 1000*32 # same capacity as the TFUniformReplayBuffer\n", "\n", "py_replay_buffer = py_uniform_replay_buffer.PyUniformReplayBuffer(\n", " capacity=replay_buffer_capacity,\n", " data_spec=tensor_spec.to_nest_array_spec(data_spec))" ] }, { "cell_type": "markdown", "metadata": { "id": "9V7DEcB8IeiQ" }, "source": [ "## 在训练过程中使用回放缓冲区\n", "\n", "我们已了解如何创建回放缓冲区,向其写入项目以及从中读取项目,现在可以在代理训练过程中使用回放缓冲区存储轨迹了。\n", "\n", "### 数据收集\n", "\n", "首先,让我们看一下如何在数据收集过程中使用回放缓冲区。\n", "\n", "在 TF-Agents 中,我们使用 `Driver`(有关更多详细信息,请参阅“驱动程序”教程)收集环境中的经历数据。要使用 `Driver`,我们需要指定 `Observer`,`Driver` 在收到轨迹时将执行该函数。\n", "\n", "因此,要将轨迹元素添加到重播缓冲区,我们需要添加一个观测函数,其调用 `add_batch(items)` 以将项目成批次地添加到重播缓冲区内。\n", "\n", "以下是使用 `TFUniformReplayBuffer` 的示例。我们首先创建环境、网络和代理。然后,我们创建 `TFUniformReplayBuffer`。请注意,回放缓冲区中轨迹元素的规范与代理的收集数据规范相同。然后,我们将其 `add_batch` 方法设置为将在训练过程中收集数据的驱动程序的观察者:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "pCbTDO3Z5UCS" }, "outputs": [], "source": [ "env = suite_gym.load('CartPole-v0')\n", "tf_env = tf_py_environment.TFPyEnvironment(env)\n", "\n", "q_net = q_network.QNetwork(\n", " tf_env.time_step_spec().observation,\n", " tf_env.action_spec(),\n", " fc_layer_params=(100,))\n", "\n", "agent = dqn_agent.DqnAgent(\n", " tf_env.time_step_spec(),\n", " tf_env.action_spec(),\n", " q_network=q_net,\n", " optimizer=tf.compat.v1.train.AdamOptimizer(0.001))\n", "\n", "replay_buffer_capacity = 1000\n", "\n", "replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(\n", " agent.collect_data_spec,\n", " batch_size=tf_env.batch_size,\n", " max_length=replay_buffer_capacity)\n", "\n", "# Add an observer that adds to the replay buffer:\n", "replay_observer = [replay_buffer.add_batch]\n", "\n", "collect_steps_per_iteration = 10\n", "collect_op = dynamic_step_driver.DynamicStepDriver(\n", " tf_env,\n", " agent.collect_policy,\n", " observers=replay_observer,\n", " num_steps=collect_steps_per_iteration).run()" ] }, { "cell_type": "markdown", "metadata": { "id": "huGCDbO4GAF1" }, "source": [ "### 读取用于训练步骤的数据\n", "\n", "将轨迹元素添加到回放缓冲区后,我们可以从回放缓冲区中批量读取轨迹,用作训练步骤的输入数据。\n", "\n", "以下示例展示了在训练循环中如何训练从回放缓冲区中读取的轨迹: " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gg8SUyXXnSMr" }, "outputs": [], "source": [ "# Read the replay buffer as a Dataset,\n", "# read batches of 4 elements, each with 2 timesteps:\n", "dataset = replay_buffer.as_dataset(\n", " sample_batch_size=4,\n", " num_steps=2)\n", "\n", "iterator = iter(dataset)\n", "\n", "num_train_steps = 10\n", "\n", "for _ in range(num_train_steps):\n", " trajectories, _ = next(iterator)\n", " loss = agent.train(experience=trajectories)\n" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "5_replay_buffers_tutorial.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }