{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "Ma19Ks2CTDbZ" }, "source": [ "##### Copyright 2023 The TF-Agents Authors." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "nQnmcm0oI1Q-" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "XljsiF6lYkuV" }, "source": [ "# 环境\n", "\n", "\n", " \n", " \n", " \n", " \n", "
在 tensorflow.google.cn 上查看 在 Google Colab 运行 在 Github 上查看源代码 下载笔记本
" ] }, { "cell_type": "markdown", "metadata": { "id": "9h3B-YBHopJI" }, "source": [ "## 简介" ] }, { "cell_type": "markdown", "metadata": { "id": "n9c6vCPGovOM" }, "source": [ "强化学习 (RL) 的目标是设计可通过与环境交互进行学习的代理。在标准 RL 设置中,代理在每个时间步骤都会收到一个观测值并选择一个操作。该操作将应用于环境,而环境会返回奖励和新的观测值。代理会训练策略以选择合适的操作,旨在使奖励总和(即回报)最大化。\n", "\n", "在 TF-Agents 中,可以使用 Python 或 TensorFlow 实现环境。Python 环境通常更易于实现、理解和调试,但 TensorFlow 环境则更为高效并且支持自然并行化。最常见的工作流是在 Python 中实现环境,然后使用我们的包装器之一将其自动转换为 TensorFlow。\n", "\n", "让我们首先看一下 Python 环境。TensorFlow 环境采用非常相似的 API。" ] }, { "cell_type": "markdown", "metadata": { "id": "4_16bQF0anmE" }, "source": [ "## 设置\n" ] }, { "cell_type": "markdown", "metadata": { "id": "Qax00bg2a4Jj" }, "source": [ "如果尚未安装 TF-Agents 或 Gym,请运行以下命令:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "KKU2iY_7at8Y" }, "outputs": [], "source": [ "!pip install tf-agents[reverb]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1ZAoFNwnRbKK" }, "outputs": [], "source": [ "from __future__ import absolute_import\n", "from __future__ import division\n", "from __future__ import print_function\n", "\n", "import abc\n", "import tensorflow as tf\n", "import numpy as np\n", "\n", "from tf_agents.environments import py_environment\n", "from tf_agents.environments import tf_environment\n", "from tf_agents.environments import tf_py_environment\n", "from tf_agents.environments import utils\n", "from tf_agents.specs import array_spec\n", "from tf_agents.environments import wrappers\n", "from tf_agents.environments import suite_gym\n", "from tf_agents.trajectories import time_step as ts" ] }, { "cell_type": "markdown", "metadata": { "id": "x-y4p9i9UURn" }, "source": [ "## Python 环境" ] }, { "cell_type": "markdown", "metadata": { "id": "JPSwHONKMNv9" }, "source": [ "Python 环境的 `step(action) -> next_time_step` 方法可将操作应用于环境,并返回有关下一步的以下信息:\n", "\n", "1. `observation`:此为环境状态的一部分,可供代理观测以选择下一步的操作。\n", "2. `reward`:代理会进行学习,目标是实现多个步骤奖励总和的最大化。\n", "3. `step_type`:与环境的交互通常是序列/片段的一部分。例如,下象棋时多次移动棋子。step_type 可以是 `FIRST`、`MID` 或 `LAST` 之一,分别指示该时间步骤是序列中的第一步、中间步或最后一步。\n", "4. `discount`:此为一个浮点数,表示下一个时间步骤的奖励相对于当前时间步骤的奖励的权重。\n", "\n", "它们被分组到一个命名元组 `TimeStep(step_type, reward, discount, observation)`。\n", "\n", "`environments/py_environment.PyEnvironment` 内包含了所有 python 环境必须实现的接口。主要方法为:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "GlD2Dd2vUTtg" }, "outputs": [], "source": [ "class PyEnvironment(object):\n", "\n", " def reset(self):\n", " \"\"\"Return initial_time_step.\"\"\"\n", " self._current_time_step = self._reset()\n", " return self._current_time_step\n", "\n", " def step(self, action):\n", " \"\"\"Apply action and return new time_step.\"\"\"\n", " if self._current_time_step is None:\n", " return self.reset()\n", " self._current_time_step = self._step(action)\n", " return self._current_time_step\n", "\n", " def current_time_step(self):\n", " return self._current_time_step\n", "\n", " def time_step_spec(self):\n", " \"\"\"Return time_step_spec.\"\"\"\n", "\n", " @abc.abstractmethod\n", " def observation_spec(self):\n", " \"\"\"Return observation_spec.\"\"\"\n", "\n", " @abc.abstractmethod\n", " def action_spec(self):\n", " \"\"\"Return action_spec.\"\"\"\n", "\n", " @abc.abstractmethod\n", " def _reset(self):\n", " \"\"\"Return initial_time_step.\"\"\"\n", "\n", " @abc.abstractmethod\n", " def _step(self, action):\n", " \"\"\"Apply action and return new time_step.\"\"\"" ] }, { "cell_type": "markdown", "metadata": { "id": "zfF8koryiGPR" }, "source": [ "除了 `step()` 方法外,环境还提供了一个 `reset()` 方法,该方法可以启动新的序列并提供初始 `TimeStep`。不必显式调用 `reset` 方法。我们假定在片段结束或首次调用 step() 时环境均会自动重置。\n", "\n", "请注意,子类不会直接实现 `step()` 或 `reset()`。相反,它们会重写 `_step()` 和 `_reset()` 方法。这些方法返回的时间步骤将通过 `current_time_step()` 缓存和公开。\n", "\n", "`observation_spec` 和 `action_spec` 方法会返回一组 `(Bounded)ArraySpecs` 嵌套,分别描述观测值和操作的名称、形状、数据类型和范围。\n", "\n", "我们在 TF-Agents 中反复提及嵌套,其定义为由列表、元组、命名元组或字典组成的任何树状结构。这些内容可以任意组合以保持观测值和操作的结构。我们发现,对于包含许多观测值和操作的更复杂环境而言,这种结构非常实用。" ] }, { "cell_type": "markdown", "metadata": { "id": "r63R-RbjcIRw" }, "source": [ "### 使用标准环境\n", "\n", "TF Agents 针对许多标准环境(如 OpenAI Gym、DeepMind-control 和 Atari)内置了包装器,因此它们支持我们的 `py_environment.PyEnvironment` 接口。这些包装的环境可以使用我们的环境套件轻松加载。让我们通过 OpenAI Gym 加载 CartPole 环境,并查看操作和 time_step_spec。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "1kBPE5T-nb2-" }, "outputs": [], "source": [ "environment = suite_gym.load('CartPole-v0')\n", "print('action_spec:', environment.action_spec())\n", "print('time_step_spec.observation:', environment.time_step_spec().observation)\n", "print('time_step_spec.step_type:', environment.time_step_spec().step_type)\n", "print('time_step_spec.discount:', environment.time_step_spec().discount)\n", "print('time_step_spec.reward:', environment.time_step_spec().reward)\n" ] }, { "cell_type": "markdown", "metadata": { "id": "vWXOC863Apo_" }, "source": [ "可以看到, 环境所预期的操作类型为 [0, 1] 区间内的 `int64`,当观测值为长度等于 4 的 `float32` 向量且折扣因子为 [0.0, 1.0] 区间内的 `float32` 时会返回 `TimeSteps`。现在,让我们尝试对整个片段采取固定操作 `(1,)`。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AzIbOJ0-0y12" }, "outputs": [], "source": [ "action = np.array(1, dtype=np.int32)\n", "time_step = environment.reset()\n", "print(time_step)\n", "while not time_step.is_last():\n", " time_step = environment.step(action)\n", " print(time_step)" ] }, { "cell_type": "markdown", "metadata": { "id": "4xAbBl4_PMtA" }, "source": [ "### 创建自己的 Python 环境\n", "\n", "对于许多客户而言,一个常见用例是采用 TF-Agents 中的一个标准代理(请参见 agents/)解决他们的问题。为此,客户需要将问题视为环境。那么,让我们看一下如何在 Python 中实现环境。\n", "\n", "假设我们要训练一个代理来玩以下纸牌游戏(受 21 点玩法启发):\n", "\n", "1. 使用无限张数字为 1 到 10 的纸牌进行游戏。\n", "2. 代理每个回合可以做两件事:随机抽取一张新的纸牌,或者停止当前回合。\n", "3. 目标是在回合结束时使您的纸牌上数字的总和尽可能接近 21,但不大于 21。\n", "\n", "代表游戏的环境可能如下所示:\n", "\n", "1. 操作:有 2 个操作。操作 0 为抽取一张新的纸牌;操作 1 为终止当前回合。\n", "2. 观测值:当前回合的纸牌上数字的总和。\n", "3. 奖励:目标是尽可能接近 21 但不超过 21,因此我们可以在回合结束时使用以下奖励实现这一目标:sum_of_cards - 21 if sum_of_cards <= 21, else -21\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "9HD0cDykPL6I" }, "outputs": [], "source": [ "class CardGameEnv(py_environment.PyEnvironment):\n", "\n", " def __init__(self):\n", " self._action_spec = array_spec.BoundedArraySpec(\n", " shape=(), dtype=np.int32, minimum=0, maximum=1, name='action')\n", " self._observation_spec = array_spec.BoundedArraySpec(\n", " shape=(1,), dtype=np.int32, minimum=0, name='observation')\n", " self._state = 0\n", " self._episode_ended = False\n", "\n", " def action_spec(self):\n", " return self._action_spec\n", "\n", " def observation_spec(self):\n", " return self._observation_spec\n", "\n", " def _reset(self):\n", " self._state = 0\n", " self._episode_ended = False\n", " return ts.restart(np.array([self._state], dtype=np.int32))\n", "\n", " def _step(self, action):\n", "\n", " if self._episode_ended:\n", " # The last action ended the episode. Ignore the current action and start\n", " # a new episode.\n", " return self.reset()\n", "\n", " # Make sure episodes don't go on forever.\n", " if action == 1:\n", " self._episode_ended = True\n", " elif action == 0:\n", " new_card = np.random.randint(1, 11)\n", " self._state += new_card\n", " else:\n", " raise ValueError('`action` should be 0 or 1.')\n", "\n", " if self._episode_ended or self._state >= 21:\n", " reward = self._state - 21 if self._state <= 21 else -21\n", " return ts.termination(np.array([self._state], dtype=np.int32), reward)\n", " else:\n", " return ts.transition(\n", " np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)" ] }, { "cell_type": "markdown", "metadata": { "id": "LYEwyX7QsqeX" }, "source": [ "让我们确保已正确地定义了上述环境。创建自己的环境时,您必须确保生成的观测值和 time_step 符合规范中定义的正确形状和类型。这些内容用于生成 TensorFlow 计算图,因此如有差错,可能会造成难以调试的问题。\n", "\n", "为了验证我们的环境,我们将使用随机策略来生成操作,并将迭代 5 个片段以确保按预期进行。如果我们收到的 time_step 不符合环境规范,则会提示错误。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "6Hhm-5R7spVx" }, "outputs": [], "source": [ "environment = CardGameEnv()\n", "utils.validate_py_environment(environment, episodes=5)" ] }, { "cell_type": "markdown", "metadata": { "id": "_36eM7MvkNOg" }, "source": [ "现在我们可以确定环境正在按预期工作,让我们使用固定策略运行此环境:抽取 3 张纸牌,然后结束该回合。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "FILylafAkMEx" }, "outputs": [], "source": [ "get_new_card_action = np.array(0, dtype=np.int32)\n", "end_round_action = np.array(1, dtype=np.int32)\n", "\n", "environment = CardGameEnv()\n", "time_step = environment.reset()\n", "print(time_step)\n", "cumulative_reward = time_step.reward\n", "\n", "for _ in range(3):\n", " time_step = environment.step(get_new_card_action)\n", " print(time_step)\n", " cumulative_reward += time_step.reward\n", "\n", "time_step = environment.step(end_round_action)\n", "print(time_step)\n", "cumulative_reward += time_step.reward\n", "print('Final Reward = ', cumulative_reward)" ] }, { "cell_type": "markdown", "metadata": { "id": "_vBLPN3ioyGx" }, "source": [ "### 环境包装器\n", "\n", "环境包装器使用 python 环境,并返回该环境的修改版本。原始环境和修改后的环境均为 `py_environment.PyEnvironment` 的实例,并且可以将多个包装器链接在一起。\n", "\n", "可以在 `environments/wrappers.py` 中找到一些常用的包装器。例如:\n", "\n", "1. `ActionDiscretizeWrapper`:将连续操作空间转换成离散操作空间。\n", "2. `RunStats`:捕获环境的运行统计信息,例如采用的步数、完成的片段数等。\n", "3. `TimeLimit`:在固定步数后终止片段。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "_8aIybRdnFfb" }, "source": [ "#### 示例 1:操作离散化包装器" ] }, { "cell_type": "markdown", "metadata": { "id": "YIaxJRUpvfyc" }, "source": [ "InvertedPendulum 是一个接受 `[-2, 2]` 区间内连续操作的 PyBullet 环境。如果要在此环境中训练离散操作代理(例如 DQN),则必须离散化(量化)操作空间。这正是 `ActionDiscretizeWrapper` 的工作。请对比包装前后的 `action_spec`:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AJxEoZ4HoyjR" }, "outputs": [], "source": [ "env = suite_gym.load('Pendulum-v1')\n", "print('Action Spec:', env.action_spec())\n", "\n", "discrete_action_env = wrappers.ActionDiscretizeWrapper(env, num_actions=5)\n", "print('Discretized Action Spec:', discrete_action_env.action_spec())" ] }, { "cell_type": "markdown", "metadata": { "id": "njFjW8bmwTWJ" }, "source": [ "包装后的 `discrete_action_env` 为 `py_environment.PyEnvironment` 的实例,可视为常规 python 环境。\n" ] }, { "cell_type": "markdown", "metadata": { "id": "8l5dwAhsP_F_" }, "source": [ "## TensorFlow 环境" ] }, { "cell_type": "markdown", "metadata": { "id": "iZG39AjBkTjr" }, "source": [ "TF 环境的接口在 `environments/tf_environment.TFEnvironment` 中定义,其与 Python 环境非常相似。TF 环境与 python 环境在以下两个方面有所不同:\n", "\n", "- TF 环境生成张量对象而非数组\n", "- 与规范相比,TF 环境会为生成的张量添加批次维度。\n", "\n", "将 python 环境转换为 TF 环境可以使 tensorflow 支持并行化运算。例如,用户可以定义 `collect_experience_op` 从环境中收集数据并添加到 `replay_buffer`,并定义 `train_op` 从 `replay_buffer` 中读取数据并训练代理,然后在 TensorFlow 中自然地并行运行二者。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "WKBDDZqKTxsL" }, "outputs": [], "source": [ "class TFEnvironment(object):\n", "\n", " def time_step_spec(self):\n", " \"\"\"Describes the `TimeStep` tensors returned by `step()`.\"\"\"\n", "\n", " def observation_spec(self):\n", " \"\"\"Defines the `TensorSpec` of observations provided by the environment.\"\"\"\n", "\n", " def action_spec(self):\n", " \"\"\"Describes the TensorSpecs of the action expected by `step(action)`.\"\"\"\n", "\n", " def reset(self):\n", " \"\"\"Returns the current `TimeStep` after resetting the Environment.\"\"\"\n", " return self._reset()\n", "\n", " def current_time_step(self):\n", " \"\"\"Returns the current `TimeStep`.\"\"\"\n", " return self._current_time_step()\n", "\n", " def step(self, action):\n", " \"\"\"Applies the action and returns the new `TimeStep`.\"\"\"\n", " return self._step(action)\n", "\n", " @abc.abstractmethod\n", " def _reset(self):\n", " \"\"\"Returns the current `TimeStep` after resetting the Environment.\"\"\"\n", "\n", " @abc.abstractmethod\n", " def _current_time_step(self):\n", " \"\"\"Returns the current `TimeStep`.\"\"\"\n", "\n", " @abc.abstractmethod\n", " def _step(self, action):\n", " \"\"\"Applies the action and returns the new `TimeStep`.\"\"\"" ] }, { "cell_type": "markdown", "metadata": { "id": "tFkBIA92ThWf" }, "source": [ "`current_time_step()` 方法会返回当前 time_step 并在需要时初始化环境。\n", "\n", "`reset()` 方法会在环境中强制执行重置并返回 current_step。\n", "\n", "如果 `action` 不依赖于上一个 `time_step`,则在 `Graph` 模式下将需要 `tf.control_dependency`。\n", "\n", "现在,让我们看看如何创建 `TFEnvironments`。" ] }, { "cell_type": "markdown", "metadata": { "id": "S6wS3AaLdVLT" }, "source": [ "### 创建自己的 TensorFlow 环境\n", "\n", "此操作比在 Python 中创建环境复杂得多,因此,我们将不会在本 Colab 中进行介绍。[此处](https://github.com/tensorflow/agents/blob/master/tf_agents/environments/tf_environment_test.py)提供了一个示例。更常见的用例是在 Python 中实现您的环境,并使用我们的 `TFPyEnvironment` 包装器将其包装为 TensorFlow 环境(请参见下文)。" ] }, { "cell_type": "markdown", "metadata": { "id": "V_Ny2lb-dU5R" }, "source": [ "### 将 Python 环境包装为 TensorFlow 环境" ] }, { "cell_type": "markdown", "metadata": { "id": "Lv4-UcurZ8nb" }, "source": [ "我们可以使用 `TFPyEnvironment` 包装器将任何 Python 环境轻松包装为 TensorFlow 环境。" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "UYerqyNfnVRL" }, "outputs": [], "source": [ "env = suite_gym.load('CartPole-v0')\n", "tf_env = tf_py_environment.TFPyEnvironment(env)\n", "\n", "print(isinstance(tf_env, tf_environment.TFEnvironment))\n", "print(\"TimeStep Specs:\", tf_env.time_step_spec())\n", "print(\"Action Specs:\", tf_env.action_spec())" ] }, { "cell_type": "markdown", "metadata": { "id": "z3WFrnX9CNpC" }, "source": [ "请注意,规范的类型现在为:`(Bounded)TensorSpec`。" ] }, { "cell_type": "markdown", "metadata": { "id": "vQPvC1ARYALj" }, "source": [ "### 用法示例" ] }, { "cell_type": "markdown", "metadata": { "id": "ov7EIrk8dKUU" }, "source": [ "#### 简单示例" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "gdvFqUqbdB7u" }, "outputs": [], "source": [ "env = suite_gym.load('CartPole-v0')\n", "\n", "tf_env = tf_py_environment.TFPyEnvironment(env)\n", "# reset() creates the initial time_step after resetting the environment.\n", "time_step = tf_env.reset()\n", "num_steps = 3\n", "transitions = []\n", "reward = 0\n", "for i in range(num_steps):\n", " action = tf.constant([i % 2])\n", " # applies the action and returns the new TimeStep.\n", " next_time_step = tf_env.step(action)\n", " transitions.append([time_step, action, next_time_step])\n", " reward += next_time_step.reward\n", " time_step = next_time_step\n", "\n", "np_transitions = tf.nest.map_structure(lambda x: x.numpy(), transitions)\n", "print('\\n'.join(map(str, np_transitions)))\n", "print('Total reward:', reward.numpy())" ] }, { "cell_type": "markdown", "metadata": { "id": "kWs48LNsdLnc" }, "source": [ "#### 整个片段" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "t561kUXMk-KM" }, "outputs": [], "source": [ "env = suite_gym.load('CartPole-v0')\n", "tf_env = tf_py_environment.TFPyEnvironment(env)\n", "\n", "time_step = tf_env.reset()\n", "rewards = []\n", "steps = []\n", "num_episodes = 5\n", "\n", "for _ in range(num_episodes):\n", " episode_reward = 0\n", " episode_steps = 0\n", " while not time_step.is_last():\n", " action = tf.random.uniform([1], 0, 2, dtype=tf.int32)\n", " time_step = tf_env.step(action)\n", " episode_steps += 1\n", " episode_reward += time_step.reward.numpy()\n", " rewards.append(episode_reward)\n", " steps.append(episode_steps)\n", " time_step = tf_env.reset()\n", "\n", "num_steps = np.sum(steps)\n", "avg_length = np.mean(steps)\n", "avg_reward = np.mean(rewards)\n", "\n", "print('num_episodes:', num_episodes, 'num_steps:', num_steps)\n", "print('avg_length', avg_length, 'avg_reward:', avg_reward)" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "2_environments_tutorial.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }