見出し画像

Stable Baselines チュートリアル(5) / カスタムGym環境の作成

以下のColabが面白かったので、ざっくり訳してみました。

Stable Baselines Tutorial - Creating a custom Gym environment

1. はじめに

このノートブックでは、OpenAI Gymインターフェースに従って「カスタムGym環境」を作成する方法を学習します。これを作成することで、「Stable Baselines」のRLアルゴリズムを簡単に適用できるようになります。

2. pipを使用して依存関係と安定したベースラインをインストール

!pip install stable-baselines[mpi]==2.8.0

3. Gymインターフェースの最初のステップ

Gymインターフェースに従う「カスタムGym環境」は簡単に作成できます。
次の3つのメソッドが存在します。

◎ reset()
エピソードの開始時に呼び出され、初期状態を返します。

◎ step(action)
行動を実行するたびに呼び出され、次の観測、報酬、エピソード完了、情報を返します。

◎ render(method = 'human')【オプション】
動作中のエージェントを視覚化します。
ColabではGUIを使えないため、直接利用できません。
「method = 'rbg_array'」を使ってシーンの画像を取得する必要があります。

次の2つのプロパティも含まれています。

◎observation_space
状態空間の型を指定します。

◎action_space
行動空間の型を指定します。

状態空間と行動空間の型について学ぶ最良の方法は、ソースコードを読むことですが、その前に主要な型を覚えましょう。

◎gym.spaces.Box
連続空間の型です。

[a, b]、(-∞, b]、[a, ∞)、または(-∞, ∞)のいずれか

◎gym.spaces.Discrete
離散空間の型です。

{0,1, …, 𝑛−1}

カスタム環境に関するドキュメントは以下にあります。
https://stable-baselines.readthedocs.io/en/master/guide/custom_env.html

import gym

env = gym.make("CartPole-v1")

# Box(4,) は、4つのコンポーネントを持つベクトルを意味する
print("Observation space:", env.observation_space)
print("Shape:", env.observation_space.shape)

# Discrete(2) は、2つの個別の行動があることを意味する
print("Action space:", env.action_space)

# reset()はエピソードの開始時に呼び出す
obs = env.reset()

# ランダム行動をサンプリング
action = env.action_space.sample()
print("Sampled action:", action)
obs, reward, done, info = env.step(action)

# obsはnumpy配列であることに注意
# infoは現時点では空だが、デバッグ情報を含めることができる
# rewardはスカラー
print(obs.shape, reward, done, info)
Observation space: Box(4,)
Shape: (4,)
Action space: Discrete(2)
Sampled action: 0
(4,) 1.0 False {}

4. Gym環境スケルトン

エージェントが常に左に行くことを学ぶ環境を実装します。

import numpy as np
import gym
from gym import spaces


class GoLeftEnv(gym.Env):
  """
  Gymのインターフェースに従うカスタム環境
  エージェントが常に左に行くことを学ぶ環境
  """
  # ColabのためGUIを実装できない
  metadata = {'render.modes': ['console']}

  # 定数を定義
  LEFT = 0
  RIGHT = 1

  def __init__(self, grid_size=10):
    super(GoLeftEnv, self).__init__()

    # 1Dグリッドのサイズ
    self.grid_size = grid_size

    # グリッドの右側でエージェントを初期化
    self.agent_pos = grid_size - 1

    # 行動空間と状態空間を定義
    # gym.spacesオブジェクトでなければならない
    # 離散行動を使用する場合の例には、左と右の2つがある
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)

    # 状態はエージェントの座標になる
    # Discrete空間とBox空間の両方で表現できる
    self.observation_space = spaces.Box(low=0, high=self.grid_size,
                                       shape=(1,), dtype=np.float32)

  def reset(self):
    """
    【重要】観測はnumpy配列でなければならない
    :return: (np.array)
    """
    # グリッドの右側でエージェントを初期化
    self.agent_pos = self.grid_size - 1

    # float32に変換してより一般的なものにします(連続行動を使用する場合)
    return np.array(self.agent_pos).astype(np.float32)

  def step(self, action):
    if action == self.LEFT:
      self.agent_pos -= 1
    elif action == self.RIGHT:
      self.agent_pos += 1
    else:
      raise ValueError("Received invalid action={} which is not part of the action space".format(action))

    # グリッドの境界を表現
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

    # グリッドの左側にいるか
    done = self.agent_pos == 0

    # ゴールを除くすべての場所で0の報酬
    reward = 1 if self.agent_pos == 0 else 0

    # 必要に応じて情報を渡すことができるが、現在は未使用
    info = {}

    return np.array(self.agent_pos).astype(np.float32), reward, done, info

  def render(self, mode='console', close=False):
    if mode != 'console':
      raise NotImplementedError()

    # エージェントは「x」、残りは「.」として表現
    print("." * self.agent_pos, end="")
    print("x", end="")
    print("." * (self.grid_size - self.agent_pos))

環境をテストします。

env = GoLeftEnv(grid_size=10)

obs = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0

# ハードコードされた最高のエージェント:常に左に行く
n_steps = 20
for step in range(n_steps):
  print("Step {}".format(step + 1))
  obs, reward, done, info = env.step(GO_LEFT)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render()
  if done:
    print("Goal reached!", "reward=", reward)
    break

「Stable Baselines」で試します。

from stable_baselines import DQN, PPO2, A2C, ACKTR
from stable_baselines.bench import Monitor
from stable_baselines.common.vec_env import DummyVecEnv

# 環境の生成
env = GoLeftEnv(grid_size=10)

# 環境のラップ
env = Monitor(env, filename=None, allow_early_resets=True)
env = DummyVecEnv([lambda: env])
# エージェントの訓練
model = ACKTR('MlpPolicy', env, verbose=1).learn(5000)
# 訓練済みエージェントのテスト
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # VecEnvは、エピソード完了に遭遇すると自動的にリセットされることに注意
    print("Goal reached!", "reward=", reward)
    break

5. 参照

・Github repo: https://github.com/araffin/rl-tutorial-jnrr19
・Stable-Baselines: https://github.com/hill-a/stable-baselines
・Documentation: https://stable-baselines.readthedocs.io/en/master/
・RL Baselines zoo: https://github.com/araffin/rl-baselines-zoo


この記事が気に入ったらサポートをしてみませんか?