走る作曲家のAIカフェ

目次

Overview

深層強化学習とは、強化学習と深層学習を組み合わせた手法です。ニューラルネットワークを使用して、エージェントが環境から得られる観測値をもとに価値関数や方策を近似します。
このページでは、深層強化学習についてPyTorchによる実装の方法をメインに学んでいきます。

Source

以下の書籍を参考にしました。

Review

PyTorchによるディープラーニングの実装

  1. データの前処理
  2. DataLoaderの作成
  3. ネットワークの構築
  4. 誤差関数と最適化手法の設定
  5. 学習と推論の実行

MNISTを使って復習してみる。


from sklearn.datasets import fetch_openml
import numpy as np
import matplotlib.pyplot as plt

# MNISTデータを取得
mnist = fetch_openml('mnist_784', version=1, data_home=".")
            
1. データの前処理

# データをNumPy配列に変換
X = np.array(mnist.data) / 255  # 正規化
y = np.array(mnist.target).astype(np.int32)  # ラベルをint32型に変換

# 1つ目の画像を表示
plt.imshow(X[0].reshape(28, 28), cmap='gray')
plt.title(f"The label of this image is {y[0]}.")
plt.show()
            
2. データローダーの作成

DataLoaderへの変換は以下の4つの手続きからなる。

  1. 訓練データとテストデータに分割
  2. NumPyデータをTensorに変換
  3. Datasetの作成
  4. DatasetをDataLoaderに変換

# 2. DataLoderの作成

import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

# 2.1 データを訓練とテストに分割(6:1)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=1/7, random_state=0)

# 2.2 データをPyTorchのTensorに変換
X_train = torch.Tensor(X_train)
X_test = torch.Tensor(X_test)
y_train = torch.LongTensor(y_train)
y_test = torch.LongTensor(y_test)

# 2.3 データとラベルをセットにしたDatasetを作成
ds_train = TensorDataset(X_train, y_train)
ds_test = TensorDataset(X_test, y_test)

# 2.4 データセットのミニバッチサイズを指定した、Dataloaderを作成
# Chainerのiterators.SerialIteratorと似ている
loader_train = DataLoader(ds_train, batch_size=64, shuffle=True)
loader_test = DataLoader(ds_test, batch_size=64, shuffle=False)
            

ラベルのような整数データの場合はtorch.LongTensorを使う。

3. ネットワークの構築

# 3. ネットワークの構築
# Keras風の書き方 

from torch import nn

model = nn.Sequential()
model.add_module('fc1', nn.Linear(28*28*1, 100))
model.add_module('relu1', nn.ReLU())
model.add_module('fc2', nn.Linear(100, 100))
model.add_module('relu2', nn.ReLU())
model.add_module('fc3', nn.Linear(100, 10))

print(model)
            
4. 誤差関数と最適化手法の設定

誤差関数にはクロスエントロピー誤差関数を、最適化手法にはAdamを設定する


# 4. 誤差関数と最適化手法の設定

from torch import optim

# 誤差関数の設定
loss_fn = nn.CrossEntropyLoss()  # 変数名にはcriterionが使われることも多い

# 重みを学習する際の最適化手法の選択
optimizer = optim.Adam(model.parameters(), lr=0.01)
            
5. 学習と推論の設定

# 5. 学習と推論の設定
# 5-1. 学習1回でやることを定義します
# Chainerのtraining.Trainer()に対応するものはない


def train(epoch):
    model.train()  # ネットワークを学習モードに切り替える

    # データローダーから1ミニバッチずつ取り出して計算する
    for data, targets in loader_train:
      
        optimizer.zero_grad()  # 一度計算された勾配結果を0にリセット
        outputs = model(data)  # 入力dataをinputし、出力を求める
        loss = loss_fn(outputs, targets)  # 出力と訓練データの正解との誤差を求める
        loss.backward()  # 誤差のバックプロパゲーションを求める
        optimizer.step()  # バックプロパゲーションの値で重みを更新する

    print("epoch{}:終了\n".format(epoch))
            

# 5. 学習と推論の設定
# 5-2. 推論1回でやることを定義します
# Chainerのtrainer.extend(extensions.Evaluator())に対応するものはない


def test():
    model.eval()  # ネットワークを推論モードに切り替える
    correct = 0

    # データローダーから1ミニバッチずつ取り出して計算する
    with torch.no_grad():  # 微分は推論では必要ない
        for data, targets in loader_test:

            outputs = model(data)  # 入力dataをinputし、出力を求める

            # 推論する
            _, predicted = torch.max(outputs.data, 1)  # 確率が最大のラベルを求める
            correct += predicted.eq(targets.data.view_as(predicted)).sum()  # 正解と一緒だったらカウントアップ

    # 正解率を出力
    data_num = len(loader_test.dataset)  # データの総数
    print('\nテストデータの正解率: {}/{} ({:.0f}%)\n'.format(correct,
                                                   data_num, 100. * correct / data_num))
            
6. 学習と推論の実行

# 6. 学習と推論の実行
for epoch in range(3):
    train(epoch)

test()
            

DQN

ニューラルネットワークへの入力は、各状態変数の値となる。よって、ニューラルネットワークの入力層の祖指数は、状態変数の値と同じになる。

出力層の素子数は、行動の種類数となる。出力する値は、行動価値関数\(Q(s_t, a_t)\)の値である。

誤差関数は以下の通り。

\begin{align} E(s_t, a_t) = (R_{t+1} + \gamma \max_a Q(s_{t+1}, a) - Q(s_t, a_t))^2 \end{align}

状態\(s_{t+1}\)は、実際に\(s_t\)から行動\(a_t\)を実施して求める。

\(\max_a Q(s_{t+1}, a)\)の値はニューラルネットワークに状態\(s_{t+1}\)を入力して求める。

Experience Replay

DQNでは、表形式表現のQ学習のように1ステップごとにそのステップの内容(experience)を学習するのではなく、メモリに各ステップの内容を保存しておき、メモリから内容をランダムに取り出して(replay)、ニューラルネットワークに学習させる。 1ステップごとの内容をtransitionという。

各ステップごとにそのステップの内容を学習すると、時間的に相関が高い内容(時刻\(t\)と時刻\(t+1\)の学習内容はよく似ている)を連続してニューラルネットワークが学習するため、結合パラメータの学習が安定しづらいという問題が発生する。 Experience Replayはこの問題を解決する工夫となる。また、Experience Replayであればメモリから複数ステップの経験を使用することができるので、ミニバッチ学習でニューラルネットワークを学習させることができる。

Fixed Target Q-Network

ニューラルネットワークとして、行動を決定するmain-networkと誤差関数の計算時に行動価値を求めるtarget-networkの2種類を用意する。

DQNでは価値関数\(Q(s_t, a)\)を更新していきたいが、Q学習のアルゴリズムで行動価値関数を更新するには、次の時刻の状態\(s_{t+1}\)での価値関数\(Q(s_{t+1}, a)\)が必要である。 つまり、Q関数の更新のために、同じQ関数を使用する必要がある。これら2つを同じQ関数にしていると、Q関数の更新学習が不安定になりやすくなるという問題が発生する。 そこで、更新に必要な\(\max_a Q(s_{t+1}, a)\)を求めるときには、少し前の時間の別のQ関数(Fixed Target Q-Network)を使って計算する。

少し前の時間の別のQ関数とは、ニューラルネットワークの結合パラメータの学習が最新版より古いという意味である。ここでの少し前という表現は、強化学習の課題対象が持つ時間ではなく、ニューラルネットワークの結合パラメータ更新のタイミングを意味している。 そのため、target-networkは定期的にmain-networkで上書きすることになる。

報酬のclipping

各ステップで得られる報酬を-1, 0, 1のいずれかに固定しておく。こうすることで課題内容(学習対象)によらず、同じハイパーパラメータでDQNを実行できるというメリットがある。

Huber関数

誤差を二乗誤差関数ではなく、Huber関数を使用して計算する。誤差が大きい場合に二乗誤差を使用すると、誤差関数の出力が大きくなりすぎて学習が安定しづらいという問題が発生する。

実装

CartPoleを例に、実装してみる。


# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym
            

from matplotlib import animation
from IPython.display import display, Video
import matplotlib.pyplot as plt

def display_frames_as_gif(frames):
    """
    Displays a list of frames as a video (mp4), with controls.
    """
    # Figureの設定
    fig = plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0),
                     dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    # 各フレームを更新する関数
    def animate(i):
        patch.set_data(frames[i])

    # アニメーションの設定
    anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50)

    # mp4ファイルとして保存
    anim.save('movie_cartpole_DQN.mp4', writer='ffmpeg')

    # IPython上で動画を表示
    display(Video("movie_cartpole_DQN.mp4", embed=True))
            

ここで、namedtupleの使用例を示す。


# 本コードでは、namedtupleを使用します。
# namedtupleを使うことで、値をフィールド名とペアで格納できます。
# すると値に対して、フィールド名でアクセスできて便利です。
# https://docs.python.jp/3/library/collections.html#collections.namedtuple
# 以下は使用例です

from collections import namedtuple

Tr = namedtuple('tr', ('name_a', 'value_b'))
Tr_object = Tr('名前Aです', 100)

print(Tr_object)  # 出力:tr(name_a='名前Aです', value_b=100)
print(Tr_object.value_b)  # 出力:100
            

DQNの実装時にも状態や行動の値にアクセスしやすくするために、各ステップでのtransition(経験)をnamedtupleを使用して変換する。実際に使用するnamedtupleは以下の通り。


# namedtupleを生成
from collections import namedtuple

Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))
            

今回使用する定数を宣言する。


# 定数の設定
ENV = 'CartPole-v0'  # 使用する課題名
GAMMA = 0.99  # 時間割引率
MAX_STEPS = 200  # 1試行のstep数
NUM_EPISODES = 500  # 最大試行回数
            

ミニバッチ学習を実現するために、経験データを保存しておくメモリクラスReplayMemoryを定義する。 ReplayMemoryはそのstepでのtransition(経験)を保存する関数pushと、ランダムにtransitionを取り出す関数sampleを用意する。 また、関数lenに対して、現在格納しているtransitionの数を返すように定義する。 メモリクラスは保存しているtransitionの数が定数CAPACITY以上になった場合には、インデックスを前に戻して古い内容がから上書きしていく。


# 経験を保存するメモリクラスを定義します


class ReplayMemory:

    def __init__(self, CAPACITY):
        self.capacity = CAPACITY  # メモリの最大長さ
        self.memory = []  # 経験を保存する変数
        self.index = 0  # 保存するindexを示す変数

    def push(self, state, action, state_next, reward):
        '''transition = (state, action, state_next, reward)をメモリに保存する'''

        if len(self.memory) < self.capacity:
            self.memory.append(None)  # メモリが満タンでないときは足す

        # namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
        self.memory[self.index] = Transition(state, action, state_next, reward)

        self.index = (self.index + 1) % self.capacity  # 保存するindexを1つずらす

    def sample(self, batch_size):
        '''batch_size分だけ、ランダムに保存内容を取り出す'''
        return random.sample(self.memory, batch_size)

    def __len__(self):
        '''関数lenに対して、現在の変数memoryの長さを返す'''
        return len(self.memory)
            

次に、Brainクラスを実装する。ここがDQNの中心部である。メソッドは関数replayと関数decide_actionである。 関数replayはメモリクラスからミニバッチを取り出して、ニューラルネットワークの結合パラメータを学習し、Q関数を更新する。 関数decide_actionはε-greedy法に従い、ランダムな行動もしくは現在の状態に対してQ値が最大となる行動のindexを返す。


# エージェントが持つ脳となるクラスです、DQNを実行します
# Q関数をディープラーニングのネットワークをクラスとして定義

import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000


class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions  # CartPoleの行動(右に左に押す)の2を取得

        # 経験を記憶するメモリオブジェクトを生成
        self.memory = ReplayMemory(CAPACITY)

        # ニューラルネットワークを構築
        self.model = nn.Sequential()
        self.model.add_module('fc1', nn.Linear(num_states, 32))
        self.model.add_module('relu1', nn.ReLU())
        self.model.add_module('fc2', nn.Linear(32, 32))
        self.model.add_module('relu2', nn.ReLU())
        self.model.add_module('fc3', nn.Linear(32, num_actions))

        print(self.model)  # ネットワークの形を出力

        # 最適化手法の設定
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.0001)

    def replay(self):
        '''Experience Replayでネットワークの結合パラメータを学習'''

        # -----------------------------------------
        # 1. メモリサイズの確認
        # -----------------------------------------
        # 1.1 メモリサイズがミニバッチより小さい間は何もしない
        if len(self.memory) < BATCH_SIZE:
            return

        # -----------------------------------------
        # 2. ミニバッチの作成
        # -----------------------------------------
        # 2.1 メモリからミニバッチ分のデータを取り出す
        transitions = self.memory.sample(BATCH_SIZE)

        # 2.2 各変数をミニバッチに対応する形に変形
        # transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
        # つまり、(state, action, state_next, reward)×BATCH_SIZE
        # これをミニバッチにしたい。つまり
        # (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
        batch = Transition(*zip(*transitions))

        # 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
        # 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
        # それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
        # 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
        # catはConcatenates(結合)のことです。
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state
                                           if s is not None])

        # -----------------------------------------
        # 3. 教師信号となるQ(s_t, a_t)値を求める
        # -----------------------------------------
        # 3.1 ネットワークを推論モードに切り替える
        self.model.eval()

        # 3.2 ネットワークが出力したQ(s_t, a_t)を求める
        # self.model(state_batch)は、右左の両方のQ値を出力しており
        # [torch.FloatTensor of size BATCH_SIZEx2]になっている。
        # ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
        # それに対応するQ値をgatherでひっぱり出す。
        state_action_values = self.model(state_batch).gather(1, action_batch)

        # 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

        # cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
                                                    batch.next_state)))
        # まずは全部0にしておく
        next_state_values = torch.zeros(BATCH_SIZE)

        # 次の状態があるindexの最大Q値を求める
        # 出力にアクセスし、max(1)で列方向の最大値の[値、index]を求めます
        # そしてそのQ値(index=0)を出力します
        # detachでその値を取り出します
        next_state_values[non_final_mask] = self.model(
            non_final_next_states).max(1)[0].detach()

        # 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
        expected_state_action_values = reward_batch + GAMMA * next_state_values

        # -----------------------------------------
        # 4. 結合パラメータの更新
        # -----------------------------------------
        # 4.1 ネットワークを訓練モードに切り替える
        self.model.train()

        # 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
        # expected_state_action_valuesは
        # sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
        loss = F.smooth_l1_loss(state_action_values,
                                expected_state_action_values.unsqueeze(1))

        # 4.3 結合パラメータを更新する
        self.optimizer.zero_grad()  # 勾配をリセット
        loss.backward()  # バックプロパゲーションを計算
        self.optimizer.step()  # 結合パラメータを更新

    def decide_action(self, state, episode):
        '''現在の状態に応じて、行動を決定する'''
        # ε-greedy法で徐々に最適行動のみを採用する
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            self.model.eval()  # ネットワークを推論モードに切り替える
            with torch.no_grad():
                action = self.model(state).max(1)[1].view(1, 1)
            # ネットワークの出力の最大値のindexを取り出します = max(1)[1]
            # .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

        else:
            # 0,1の行動をランダムに返す
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])  # 0,1の行動をランダムに返す
            # actionは[torch.LongTensor of size 1x1]の形になります

        return action
            

関数replayでは4つのことを行っている。

  1. メモリサイズの確認
    • メモリサイズがミニバッチより小さい間は何もしない
  2. ミニバッチの作成
    • メモリからミニバッチ分のデータを取り出す
    • 各変数をミニバッチに対応する形に変形
    • 各変数の要素をミニバッチに対応する形に変形
      • ミニバッチの作成はステップから成る。まずランダムにミニバッチ数分だけtransitionを取り出した変数transitionsを作成する。 transitionが1stepごとのデータを固めて格納しているため、transitionsのままではPyTorchのミニバッチとして扱えない。 そこで2段階の変形を行う。はじめに各変数(状態や行動など)をミニバッチに対応する形に変形する。 具体的には、(state, action, state_next, reward)×BATCH_SIZEとなっている形を、(state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする。 次に各変数(状態や行動など)の要素(位置や速度など)をミニバッチで扱える形に変形する。 例えばstateの場合、torch.FloatTensor of size 1x4がBATCH_SIZE分並んでいるが、それをtorch.FloatTensor of size BATCH_SIZEx4になるように変換する。 なお次の状態があるのか終了状態なのかでQ学習の更新式が変わる。 そこで変数non_final_next_statesという、次の状態が存在する状態だけを集めたミニバッチも別途用意する。
  3. 教師信号となる\(Q(s_t, a_t)\)値を求める
    • ネットワークを推論モードに切り替える
    • ネットワークが出力した\(Q(s_t, a_t)\)を求める
    • \(\max_a Q(s_{t+1}, a)\)の値を求める(次の状態がある場合のみ)
    • 教師となる\(Q(s_t, a_t)\)の値をQ学習の式から求める
      • 状態のミニバッチ変数state_batchをネットワークに入力し、その出力から実際に行った行動のミニバッチ変数action_batchに対応するものを関数gatherを使って取り出す。 ここでは、Fixed Target Q-Networkをミニバッチ学習で代替している。 教師信号の計算で使用しているdetach()とはネットワークの出力の値を取り出すという意味である。PyTorchではdetach()することでその変数が保持しているそれまでの計算履歴を失い、バックプロパゲーションする際に微分を計算しなくなる。 結合パラメータの学習において、教師信号は固定されたものにしておく必要がある。そこでdetach()を行い、教師信号に微分操作が行われないようにする。 一方で実際にネットワークのが予測として出力した\(Q(s_t, a_t)\)は微分できるようにdetach()は行わない。 そしてこの\(Q(s_t, a_t)\)が教師信号に近づくように微分を求め、ネットワークの結合パラメータを更新する。
  4. 結合パラメータの更新
    • ネットワークを訓練モードに切り替える
    • 損失関数の値を計算する
    • 結合パラメータを更新する

Agentクラスを定義する。関数memorizeでは、メモリオブジェクトに経験したデータ(transition)を格納する。


# CartPoleで動くエージェントクラスです、棒付き台車そのものになります


class Agent:
    def __init__(self, num_states, num_actions):
        '''課題の状態と行動の数を設定する'''
        self.brain = Brain(num_states, num_actions)  # エージェントが行動を決定するための頭脳を生成

    def update_q_function(self):
        '''Q関数を更新する'''
        self.brain.replay()

    def get_action(self, state, episode):
        '''行動を決定する'''
        action = self.brain.decide_action(state, episode)
        return action

    def memorize(self, state, action, state_next, reward):
        '''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
        self.brain.memory.push(state, action, state_next, reward)
            

続いて、環境クラスを定義する。表形式表現のように離散化は行わない。また、直近10episodeの立ち続けたstep数を格納するリストを用意し、その平均値を見ることで学習の進捗を分かりやすくする。


# CartPoleを実行する環境のクラスです


class Environment:

    def __init__(self):
        self.env = gym.make(ENV)  # 実行する課題を設定
        num_states = self.env.observation_space.shape[0]  # 課題の状態数4を取得
        num_actions = self.env.action_space.n  # CartPoleの行動(右に左に押す)の2を取得
        self.agent = Agent(num_states, num_actions)  # 環境内で行動するAgentを生成

        
    def run(self):
        '''実行'''
        episode_10_list = np.zeros(10)  # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
        complete_episodes = 0  # 195step以上連続で立ち続けた試行数
        episode_final = False  # 最後の試行フラグ
        frames = []  # 最後の試行を動画にするために画像を格納する変数

        for episode in range(NUM_EPISODES):  # 最大試行数分繰り返す
            observation = self.env.reset()  # 環境の初期化

            state = observation  # 観測をそのまま状態sとして使用
            state = torch.from_numpy(state).type(
                torch.FloatTensor)  # NumPy変数をPyTorchのテンソルに変換
            state = torch.unsqueeze(state, 0)  # size 4をsize 1x4に変換

            for step in range(MAX_STEPS):  # 1エピソードのループ

                if episode_final is True:  # 最終試行ではframesに各時刻の画像を追加していく
                    frames.append(self.env.render(mode='rgb_array'))

                action = self.agent.get_action(state, episode)  # 行動を求める

                # 行動a_tの実行により、s_{t+1}とdoneフラグを求める
                # actionから.item()を指定して、中身を取り出す
                observation_next, _, done, _ = self.env.step(
                    action.item())  # rewardとinfoは使わないので_にする

                # 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
                if done:  # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
                    state_next = None  # 次の状態はないので、Noneを格納

                    # 直近10episodeの立てたstep数リストに追加
                    episode_10_list = np.hstack(
                        (episode_10_list[1:], step + 1))

                    if step < 195:
                        reward = torch.FloatTensor(
                            [-1.0])  # 途中でこけたら罰則として報酬-1を与える
                        complete_episodes = 0  # 連続成功記録をリセット
                    else:
                        reward = torch.FloatTensor([1.0])  # 立ったまま終了時は報酬1を与える
                        complete_episodes = complete_episodes + 1  # 連続記録を更新
                else:
                    reward = torch.FloatTensor([0.0])  # 普段は報酬0
                    state_next = observation_next  # 観測をそのまま状態とする
                    state_next = torch.from_numpy(state_next).type(
                        torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
                    state_next = torch.unsqueeze(state_next, 0)  # size 4をsize 1x4に変換

                # メモリに経験を追加
                self.agent.memorize(state, action, state_next, reward)

                # Experience ReplayでQ関数を更新する
                self.agent.update_q_function()

                # 観測の更新
                state = state_next

                # 終了時の処理
                if done:
                    print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
                        episode, step + 1, episode_10_list.mean()))
                    break

            if episode_final is True:
                # 動画を保存と描画
                display_frames_as_gif(frames)
                break

            # 10連続で200step経ち続けたら成功
            if complete_episodes >= 10:
                print('10回連続成功')
                episode_final = True  # 次の試行を描画を行う最終試行とする
            

最後に実行する。


# main クラス
cartpole_env = Environment()
cartpole_env.run()
            

DDQN

DQNには、2013年バージョンと2015年バージョンの2通りがある。 前セクションでの、ミニバッチ学習による実装は2013年バージョンに相当する。 2015年バージョンでは、Target Q-Networkを用意してMain Q-Networkを学習させる。 そして、数ステップに一度、Target Q-Networkを更新してMain Q-Networkと一致させる。 2015年バージョンのDQNの更新式は以下の通り。

\begin{align} Q_m (s_t, a_t) = Q_m (s_t, a_t) + \eta * (R_{t+1} + \gamma \max_a Q_t (s_{t+1}, a) - Q_m (s_t, a_t)) \end{align}

ここで、\(Q_m\)はMain Q-Network、\(Q_t\)はTarget Q-Networkを示す。 この式では右辺の\(\max_a Q_t (s_{t+1}, a)\)をTarget Q-Networkから求める。 つまり、次の状態\(s_{t+1}\)でのQ値が最大となる行動\(a\)、およびそのときのQ値の2つをTarget Q-Networkから求めている。

DDQN(Double DQN)では、この更新式をより安定させる方法である。具体的には、次のような更新式を使用する。

\begin{align} a_m &= \arg \max_a Q_m (s_{t+1}, a) \\ Q_m (s_t, a_t) &= Q_m (s_t, a_t) + \eta * (R_{t+1} + \gamma Q_t (s_{t+1}, a_m) - Q_m (s_t, a_t)) \end{align}

つまり、次の状態\(s_{t+1}\)でのQ値が最大となる行動\(a_m\)はMain Q-Networkから求め、その行動\(a_m\)でのQ値はTarget Q-Networkから求める。

DDQNの実装


# パッケージのimport
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import gym
            

from matplotlib import animation
from IPython.display import display, Video
import matplotlib.pyplot as plt

def display_frames_as_gif(frames):
    """
    Displays a list of frames as a video (mp4), with controls.
    """
    # Figureの設定
    fig = plt.figure(figsize=(frames[0].shape[1]/72.0, frames[0].shape[0]/72.0),
                     dpi=72)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    # 各フレームを更新する関数
    def animate(i):
        patch.set_data(frames[i])

    # アニメーションの設定
    anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50)

    # mp4ファイルとして保存
    anim.save('movie_cartpole_DDQN.mp4', writer='ffmpeg')

    # IPython上で動画を表示
    display(Video("movie_cartpole_DDQN.mp4", embed=True))
            

# namedtupleを生成
from collections import namedtuple

Transition = namedtuple(
    'Transition', ('state', 'action', 'next_state', 'reward'))
            

# 定数の設定
ENV = 'CartPole-v0'  # 使用する課題名
GAMMA = 0.99  # 時間割引率
MAX_STEPS = 200  # 1試行のstep数
NUM_EPISODES = 500  # 最大試行回数
            

# 経験を保存するメモリクラスを定義します


class ReplayMemory:

    def __init__(self, CAPACITY):
        self.capacity = CAPACITY  # メモリの最大長さ
        self.memory = []  # 経験を保存する変数
        self.index = 0  # 保存するindexを示す変数

    def push(self, state, action, state_next, reward):
        '''transition = (state, action, state_next, reward)をメモリに保存する'''

        if len(self.memory) < self.capacity:
            self.memory.append(None)  # メモリが満タンでないときは足す

        # namedtupleのTransitionを使用し、値とフィールド名をペアにして保存します
        self.memory[self.index] = Transition(state, action, state_next, reward)

        self.index = (self.index + 1) % self.capacity  # 保存するindexを1つずらす

    def sample(self, batch_size):
        '''batch_size分だけ、ランダムに保存内容を取り出す'''
        return random.sample(self.memory, batch_size)

    def __len__(self):
        '''関数lenに対して、現在の変数memoryの長さを返す'''
        return len(self.memory)
            


# ディープ・ニューラルネットワークの構築
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):

    def __init__(self, n_in, n_mid, n_out):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(n_in, n_mid)
        self.fc2 = nn.Linear(n_mid, n_mid)
        self.fc3 = nn.Linear(n_mid, n_out)

    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        output = self.fc3(h2)
        return output
            

重要な点は2つのネットワークを用意すること。前セクションにおける、Brainクラスの関数replayを短くする(リファクタリング)。 ミニバッチの作成を関数make_minibatch、教師信号となる\(Q(s_t, a_t)\)の値を求める部分を関数get_expected_state_action_values、結合パラメータの更新を関数update_main_q_networkとする。


# エージェントが持つ脳となるクラスです、DDQNを実行します

import random
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

BATCH_SIZE = 32
CAPACITY = 10000


class Brain:
    def __init__(self, num_states, num_actions):
        self.num_actions = num_actions  # CartPoleの行動(右に左に押す)の2を取得

        # 経験を記憶するメモリオブジェクトを生成
        self.memory = ReplayMemory(CAPACITY)

        # ニューラルネットワークを構築
        n_in, n_mid, n_out = num_states, 32, num_actions
        self.main_q_network = Net(n_in, n_mid, n_out)  # Netクラスを使用
        self.target_q_network = Net(n_in, n_mid, n_out)  # Netクラスを使用
        print(self.main_q_network)  # ネットワークの形を出力

        # 最適化手法の設定
        self.optimizer = optim.Adam(
            self.main_q_network.parameters(), lr=0.0001)

    def replay(self):
        '''Experience Replayでネットワークの結合パラメータを学習'''

        # 1. メモリサイズの確認
        if len(self.memory) < BATCH_SIZE:
            return

        # 2. ミニバッチの作成
        self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()

        # 3. 教師信号となるQ(s_t, a_t)値を求める
        self.expected_state_action_values = self.get_expected_state_action_values()

        # 4. 結合パラメータの更新
        self.update_main_q_network()

    def decide_action(self, state, episode):
        '''現在の状態に応じて、行動を決定する'''
        # ε-greedy法で徐々に最適行動のみを採用する
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            self.main_q_network.eval()  # ネットワークを推論モードに切り替える
            with torch.no_grad():
                action = self.main_q_network(state).max(1)[1].view(1, 1)
            # ネットワークの出力の最大値のindexを取り出します = max(1)[1]
            # .view(1,1)は[torch.LongTensor of size 1] を size 1x1 に変換します

        else:
            # 0,1の行動をランダムに返す
            action = torch.LongTensor(
                [[random.randrange(self.num_actions)]])  # 0,1の行動をランダムに返す
            # actionは[torch.LongTensor of size 1x1]の形になります

        return action

    def make_minibatch(self):
        '''2. ミニバッチの作成'''

        # 2.1 メモリからミニバッチ分のデータを取り出す
        transitions = self.memory.sample(BATCH_SIZE)

        # 2.2 各変数をミニバッチに対応する形に変形
        # transitionsは1stepごとの(state, action, state_next, reward)が、BATCH_SIZE分格納されている
        # つまり、(state, action, state_next, reward)×BATCH_SIZE
        # これをミニバッチにしたい。つまり
        # (state×BATCH_SIZE, action×BATCH_SIZE, state_next×BATCH_SIZE, reward×BATCH_SIZE)にする
        batch = Transition(*zip(*transitions))

        # 2.3 各変数の要素をミニバッチに対応する形に変形し、ネットワークで扱えるようVariableにする
        # 例えばstateの場合、[torch.FloatTensor of size 1x4]がBATCH_SIZE分並んでいるのですが、
        # それを torch.FloatTensor of size BATCH_SIZEx4 に変換します
        # 状態、行動、報酬、non_finalの状態のミニバッチのVariableを作成
        # catはConcatenates(結合)のことです。
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        non_final_next_states = torch.cat([s for s in batch.next_state
                                           if s is not None])

        return batch, state_batch, action_batch, reward_batch, non_final_next_states

    def get_expected_state_action_values(self):
        '''3. 教師信号となるQ(s_t, a_t)値を求める'''

        # 3.1 ネットワークを推論モードに切り替える
        self.main_q_network.eval()
        self.target_q_network.eval()

        # 3.2 ネットワークが出力したQ(s_t, a_t)を求める
        # self.model(state_batch)は、右左の両方のQ値を出力しており
        # [torch.FloatTensor of size BATCH_SIZEx2]になっている。
        # ここから実行したアクションa_tに対応するQ値を求めるため、action_batchで行った行動a_tが右か左かのindexを求め
        # それに対応するQ値をgatherでひっぱり出す。
        self.state_action_values = self.main_q_network(
            self.state_batch).gather(1, self.action_batch)

        # 3.3 max{Q(s_t+1, a)}値を求める。ただし次の状態があるかに注意。

        # cartpoleがdoneになっておらず、next_stateがあるかをチェックするインデックスマスクを作成
        non_final_mask = torch.ByteTensor(tuple(map(lambda s: s is not None,
                                                    self.batch.next_state)))
        # まずは全部0にしておく
        next_state_values = torch.zeros(BATCH_SIZE)

        a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)

        # 次の状態での最大Q値の行動a_mをMain Q-Networkから求める
        # 最後の[1]で行動に対応したindexが返る
        a_m[non_final_mask] = self.main_q_network(
            self.non_final_next_states).detach().max(1)[1]

        # 次の状態があるものだけにフィルターし、size 32を32×1へ
        a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)

        # 次の状態があるindexの、行動a_mのQ値をtarget Q-Networkから求める
        # detach()で取り出す
        # squeeze()でsize[minibatch×1]を[minibatch]に。
        next_state_values[non_final_mask] = self.target_q_network(
            self.non_final_next_states).gather(1, a_m_non_final_next_states).detach().squeeze()

        # 3.4 教師となるQ(s_t, a_t)値を、Q学習の式から求める
        expected_state_action_values = self.reward_batch + GAMMA * next_state_values

        return expected_state_action_values

    def update_main_q_network(self):
        '''4. 結合パラメータの更新'''

        # 4.1 ネットワークを訓練モードに切り替える
        self.main_q_network.train()

        # 4.2 損失関数を計算する(smooth_l1_lossはHuberloss)
        # expected_state_action_valuesは
        # sizeが[minbatch]になっているので、unsqueezeで[minibatch x 1]へ
        loss = F.smooth_l1_loss(self.state_action_values,
                                self.expected_state_action_values.unsqueeze(1))

        # 4.3 結合パラメータを更新する
        self.optimizer.zero_grad()  # 勾配をリセット
        loss.backward()  # バックプロパゲーションを計算
        self.optimizer.step()  # 結合パラメータを更新

    def update_target_q_network(self):  # DDQNで追加
        '''Target Q-NetworkをMainと同じにする'''
        self.target_q_network.load_state_dict(self.main_q_network.state_dict())
            

Brainクラスの関数initにおいて最適化手法の設定では引数をself.main_q_network.parameters()とし、学習させるMain Q-Networkを設定する。 その他Brainクラスでは、前セクションでは変数modelとしていた部分を変数main_q_networkに変更する。

Brainクラスの変更に合わせて、Agentクラスを少し変更する。新たに関数update_target_q_functionを実装し、Brainクラスの関数update_target_q_networkを実行させる。 最後にEnvironmentクラスで試行(エピソード)の終了時に、Agentクラスの関数update_target_q_functionを実行させる。 今回は2試行に1度実行し、Main Q-Networkの値をTarget Q-Networkにコピーさせる。

AgentクラスとEnvironmentクラスは以下の通り。


# CartPoleで動くエージェントクラスです、棒付き台車そのものになります


class Agent:
    def __init__(self, num_states, num_actions):
        '''課題の状態と行動の数を設定する'''
        self.brain = Brain(num_states, num_actions)  # エージェントが行動を決定するための頭脳を生成

    def update_q_function(self):
        '''Q関数を更新する'''
        self.brain.replay()

    def get_action(self, state, episode):
        '''行動を決定する'''
        action = self.brain.decide_action(state, episode)
        return action

    def memorize(self, state, action, state_next, reward):
        '''memoryオブジェクトに、state, action, state_next, rewardの内容を保存する'''
        self.brain.memory.push(state, action, state_next, reward)

    def update_target_q_function(self):
        '''Target Q-NetworkをMain Q-Networkと同じに更新'''
        self.brain.update_target_q_network()
            

# CartPoleを実行する環境のクラスです


class Environment:

    def __init__(self):
        self.env = gym.make(ENV)  # 実行する課題を設定
        num_states = self.env.observation_space.shape[0]  # 課題の状態と行動の数を設定
        num_actions = self.env.action_space.n  # CartPoleの行動(右に左に押す)の2を取得
        # 環境内で行動するAgentを生成
        self.agent = Agent(num_states, num_actions)

    def run(self):
        '''実行'''
        episode_10_list = np.zeros(10)  # 10試行分の立ち続けたstep数を格納し、平均ステップ数を出力に利用
        complete_episodes = 0  # 195step以上連続で立ち続けた試行数
        episode_final = False  # 最後の試行フラグ
        frames = []  # 最後の試行を動画にするために画像を格納する変数

        for episode in range(NUM_EPISODES):  # 試行数分繰り返す
            observation = self.env.reset()  # 環境の初期化

            state = observation  # 観測をそのまま状態sとして使用
            state = torch.from_numpy(state).type(
                torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
            state = torch.unsqueeze(state, 0)  # size 4をsize 1x4に変換

            for step in range(MAX_STEPS):  # 1エピソードのループ
                
                if episode_final is True:  # 最終試行ではframesに各時刻の画像を追加していく
                    frames.append(self.env.render(mode='rgb_array'))
                    
                action = self.agent.get_action(state, episode)  # 行動を求める

                # 行動a_tの実行により、s_{t+1}とdoneフラグを求める
                # actionから.item()を指定して、中身を取り出す
                observation_next, _, done, _ = self.env.step(
                    action.item())  # rewardとinfoは使わないので_にする

                # 報酬を与える。さらにepisodeの終了評価と、state_nextを設定する
                if done:  # ステップ数が200経過するか、一定角度以上傾くとdoneはtrueになる
                    state_next = None  # 次の状態はないので、Noneを格納

                    # 直近10episodeの立てたstep数リストに追加
                    episode_10_list = np.hstack(
                        (episode_10_list[1:], step + 1))

                    if step < 195:
                        reward = torch.FloatTensor(
                            [-1.0])  # 途中でこけたら罰則として報酬-1を与える
                        complete_episodes = 0  # 連続成功記録をリセット
                    else:
                        reward = torch.FloatTensor([1.0])  # 立ったまま終了時は報酬1を与える
                        complete_episodes = complete_episodes + 1  # 連続記録を更新
                else:
                    reward = torch.FloatTensor([0.0])  # 普段は報酬0
                    state_next = observation_next  # 観測をそのまま状態とする
                    state_next = torch.from_numpy(state_next).type(
                        torch.FloatTensor)  # numpy変数をPyTorchのテンソルに変換
                    state_next = torch.unsqueeze(state_next, 0)  # size 4をsize 1x4に変換

                # メモリに経験を追加
                self.agent.memorize(state, action, state_next, reward)

                # Experience ReplayでQ関数を更新する
                self.agent.update_q_function()

                # 観測の更新
                state = state_next

                # 終了時の処理
                if done:
                    print('%d Episode: Finished after %d steps:10試行の平均step数 = %.1lf' % (
                        episode, step + 1, episode_10_list.mean()))
                    
                    # DDQNで追加、2試行に1度、Target Q-NetworkをMainと同じにコピーする
                    if(episode % 2 == 0):
                        self.agent.update_target_q_function()
                    break
                    
                    
            if episode_final is True:
                # 動画を保存と描画
                display_frames_as_gif(frames)
                break

            # 10連続で200step経ち続けたら成功
            if complete_episodes >= 10:
                print('10回連続成功')
                episode_final = True  # 次の試行を描画を行う最終試行とする
            

最後に、実行する。


# main クラス
cartpole_env = Environment()
cartpole_env.run()