走る作曲家のAIカフェ

目次

Overview

深層学習とは、人工ニューラルネットワークを多層に積み重ねて学習させる手法で、複雑なデータのパターンや特徴を自動的に抽出することができる機械学習の一分野です。
このページでは、深層学習のライブラリであるPyTorchを用いて、深層学習の基礎を学んでいきます。

Source

以下の書籍を参考にしました。

Python For Deep Learning

コンテナデータ型

コンテナデータ型:「リスト」やNumPy配列のように、名前から実際のデータにアクセスするのにインデックスを経由する必要があるデータの型。


x = np.array([5, 7, 9])
y = x
x[1] = -1
print(y)
                
この出力は、[5, -1, 9]となる。xの要素の変更がyに影響しないようにするには、y = x.copy()を使う。

PyTorchで扱うデータは「テンソル」(Tensor)というクラスのインスタンスに保存される。

テンソルの場合も同様にcopyしておけば他からの影響を気にしないで済む。

カスタムクラス定義

オブジェクト指向の基礎概念

クラス:「型」。

インスタンス:「型」から生成された個別の実体。

クラスは、「属性」と呼ばれるクラス内の変数を持っている。

「関数」あるいは「メソッド」と呼ばれる処理機能も持っている。

属性と呼ばれるクラス内の変数の値は、インスタンスごとに異なる。

最初のクラス定義

例)Pointというクラスを定義してみる。

Pointクラスの属性のxとyは、点のx座標とy座標である。

また、関数drawは、自分を点としてグラフに表示する関数である。

以下のように実装される。


import matplotlib.pyplot as plt

# 円描画に必要なライブラリ
import matplotlib.patches as patches

# クラスPointの定義
Class Point:
    # インスタンス生成時にxとyの2つの引数を持つ
    def __init__(self, x, y):
        # インスタンスの属性xに第1引数をセットする
        self.x = x
        # インスタンスの属性yに第2引数をセットする
        self.y = y
    # 描画関数drawの定義(引数なし)
        def draw(self):
            # (x, y)に点を描画する
            plt.plot(self.x, self.y, marker='o', markersize=10, c='k')
            

__init__は初期化処理。

selfはクラスからインスタンスが生成された際、インスタンス自身を指す。

最初のインスタンス生成

# クラスPointからインスタンス変数p1とp2を生成する
p1 = Point(2, 3)
p2 = Point(-1, -2)
            

引数リストの引数は、__init__関数の定義の引数からselfを取り除いたもの。

インスタンス属性へのアクセス

# p1とp2の属性x、yの参照
print(p1.x, p1.y)
print(p2.x, p2.y)
            
draw関数の呼び出し

# p1とp2のdraw関数を呼び出し、2つの点を描画する
p1.draw()
p2.draw()
plt.xlim(-4, 4)
plt.ylim(-4, 4)
plt.show()
            
Circleクラスの定義

中心点のx座標とy座標の他に半径を意味する属性rを持つ。

xとyについてはPointクラスの定義を再利用できる(「クラスの継承」)。


# Pointの子クラスCircleの定義
class Circle(Point)
    # Circleはインスタンス生成時に引数x、y、rを持つ
    def __init__(self, x, y, r):
        # xとyは、親クラスの属性として設定
        super().__init__(x, y)
        # rは、Circleの属性として設定
        self.r = r

    # Circleのdraw関数は、親の関数呼び出しのあとで、円の描画も独自に行う
    def draw(self):
        # 親クラスのdraw関数呼び出し
        super().draw()
                
        # 円の描画
        c = patches.Circle(xy=(self.x, self.y), radius=self.r, fc='b', ec='k')
        ax.add_patch(c)
            

CircleクラスはPointクラスの子クラスとして定義している。

Circleインスタンスの生成とdraw関数の呼び出し

# クラスCircleからインスタンス変数c1を生成する
c1 = Circle(1, 0, 2)

# p1, p2, c1のそれぞれのdraw関数を呼び出す
ax = plt.subplot()
p1.draw()
p2.draw()
c1.draw()
plt.xlim(-4, 4)
plt.ylim(-4, 4)
plt.show()
            

親クラスと同じ名前の関数を子クラスでも定義して振る舞いを変更させることを「オーバーライド」と呼ぶ。

インスタンスを関数として扱う

クラスから生成したインスタンスを呼び出し可能な関数にする。


# 関数クラスHの定義
class H:
    def __call__(self, x):
        return 2*x**2 + 2
            

# hが関数として動作することを確認する

# NumPy配列としてxを定義
x = np.arange(-2, 2.1, 0.25)
print(x)

# Hクラスのインスタンスとしてhを生成
h = H()

# 関数hの呼び出し
y = h(x)
print(y)
            

Basics of PyTorch

重要な概念

PyTorchでは、テンソルという独自のクラスでデータを表現する。

PyTorchの最大の特徴は、自動微分機能。

テンソル

ライブラリインポート

import torch
            
色々な階層のテンソル

# 0階テンソル(スカラー)
r0 = torch.tensor(1.0).float()
            

テンソル変数を作るのに一番簡単なのは、torch.tensor関数を使う方法。

テンソル変数の生成時には、必ず後ろにfloat関数の呼び出しをつけて、dtype(テンソル変数の要素のデータ型)を強制的にfloat32に変換する。


# 1階テンソル(ベクトル)

# 1階のNumPy変数作成
r1_np = np.array([1, 2, 3, 4, 5])

# NumPyからテンソルに変換
r1 = torch.tensor(r1_np).float()

# dtypeを調べる
print(r1.dtype)

# shapeを調べる
print(r1.shape)

# データを調べる
print(r1.data)
            

# 2階テンソル(行列)

# 2階のNumPy変数作成
r2_np = np.array([1, 5, 6], [4, 3, 2])

# NumPyからテンソルに変換
r2 = torch.tensor(r2_np).float()
            

# 3階テンソル

# 乱数seedの初期化
torch.manual_seed(123)

# shape=[3,2,2]の正規分布変数テンソルを作る
r3 = torch.randn((3, 2, 2))
            
整数値のテンソル

PyTorchによる計算では、そのほとんどの場合、数値型としてdtype=float32を利用する。

しかし、「多値分類」用の損失関数である、nn.CrossEntropyLossとnn.NLLLossは、損失関数呼び出し時に、第2引数に整数型を指定する必要がある。

その際は、以下のように整数型へ変換する。


r5 = r1.long()
            

このようにlong関数をかけると、dtype=torch.int64になる。

view関数

view関数は、NumPyのreshape関数に相当し、変数の階数を変換することができる。


# 2階化(r3は[3,2,2]の3階テンソル)
# 要素数に-1を指定すると、この数を自動調整する
r6 = r3.view(3, -1)
            

結果として、[3, 4]の2階テンソルが得られる。


# 1階化
r7 - r3.view(-1)
            
それ以外の属性

テンソルはデータとして扱うことも、クラスとして扱うこともできる。


# requires_grad属性
print('requires_grad: ', r1.requires_grad)

# device属性
print('device: ', r1.device)
            
item関数

スカラー(0階テンソル)に対しては、テンソルからPython本来のクラスの数値(floatまたはint)を取り出すのにitem関数が使える。

計算結果テンソルとしてのlossから、データ記録用に値だけを抽出する場合によく用いる。


item = r0.item()
            

この関数は、1階以上のテンソルを対象にできない。

max関数

# max関数を引数無しで呼び出すと、全体の最大値が取得できる
print(r2.max())
            

# torch.max関数
# 2つ目の引数はどの軸で集約するかを意味する(軸=1: 行方向、軸=0: 列方向)
print(torch.max(r2, 1))
            

この呼び出し方をした場合、最大値の値そのものだけでなく、どのindexで最大値を取ったかも返ってくる。

後ろに[1]をつけると、後者のみを抽出できる。


# 何番目の要素が最大値を取るかは、indicesを調べると良い
# 以下の計算は、多値分類で予測ラベルを求めるときによく利用される
print(torch.max(r2, 1)[1])
            

「複数の予測器の出力のうち、最も大きな値を出した予測器のindexが予測結果ラベルになる」ことを実装している。

NumPy変数への変換

# NumPy化
r2_np = r2.data.numpy()
            

ただし、この方法だと、テンソル変数とNumPy配列は同じデータを指すため、テンソル側で値を変えるとNumPyも連動して値が変わる。

連動しないようにするには、r2.data.numpy.copy()として、データのコピーを作る。

自動微分機能

PyTorchで自動微分を行う場合の処理の流れ:

  1. 勾配計算用変数の定義:requires_grad=Trueとする
  2. テンソル変数間で計算:裏で計算グラフが自動生成される
  3. 計算グラフの可視化:make_dot関数
  4. 勾配計算:backward関数
  5. 勾配値の取得:grad属性
  6. 勾配値の初期化:zero_関数
1. 勾配計算用変数の定義

requires_grad属性をTrueに設定する。

2. テンソル変数間で計算

他のテンソル変数との間で演算する。

このとき、計算式による値の計算が行われるのと同時に、裏で「計算グラフ」が生成される。

この機能は「Define by Run」と呼ばれる。

「計算グラフ」とは、データとそれに対する演算の順番を定義するもの。

3. 計算グラフの可視化

PyTorch内部の動きを確認するために行う。

make_dotという関数を使うと、2.で自動生成された計算グラフを可視化できる。

4. 勾配計算

勾配計算は、計算結果を保存したテンソル変数(スカラー)に対して、backward関数を呼び出すことにより行われる。

5. 勾配値の取得

勾配計算の結果は、PyTorchでは勾配値と呼ばれる。

勾配値はテンソル変数のgrad属性により取得できる。

6. 勾配値の初期化

grad属性に保存されている勾配値は、利用が終わったら値を初期化する必要がある。

そのための関数がzero_関数である。

2次関数の勾配計算

例)\(y = 2x^2+2\)に対して自動微分計算を行う。

1. 勾配計算用変数の定義

# xをNumPy配列で定義
x_np = np.arange(-2, 2.1, 0.25)
            

# 1. 勾配計算用変数の定義

# requires=Trueとする
x = torch.tensor(x_np, requires_grad=True, dtype=torch.float32)
            
2. テンソル変数間で計算

# 2次関数の計算
# 裏で計算グラフが自動生成される

y = 2 * x**2 + 2
            

このとき、変数yは自動的にテンソル変数になる。

requires_grad属性がTrueのテンソル変数は、そのままではMatplotlibで使えないが、data属性を渡すとグラフ表示が可能。


# グラフ描画

plt.plot(x.data, y.data)
plt.show()
            

勾配計算の対象はスカラーである必要があるため、yの値をsum関数ですべて足して、足した結果を新しいテンソル変数zに代入する。

(「なぜ和をとることで微分計算ができるのか」についての説明はこちら。)


# 勾配計算のため、sum 関数で 1階テンソルの関数値をスカラー化する
# (sum 関数を各要素で偏微分した結果は1なので、元の関数の微分結果を取得可能 )
z = y.sum()
            
3. 計算グラフの可視化

この変数zを使って、計算グラフの可視化を行う。


# 3. 計算グラフの可視化

# 必要なライブラリのインポート
from torchviz import make_dot

# 可視化関数の呼び出し
g = make_dot(z, params={'x': x})
display(g)
            

make_dot関数の呼び出し時の第1引数として、可視化したい計算グラフの対象となる変数(今回の場合z)、第2引数のparamsとして微分計算対象の変数(今回の場合x)を辞書形式のパラメータリストにして渡す。

出発点のxをテンソル変数とし、かつrequires_gradフラグをセットしておくと、計算の過程が自動的に記録される。

値を計算しながら、計算過程を自動的に記録する機能は「Define by Run」と呼ばれる。

計算グラフにおいて、青色のノードはmake_dot関数呼び出し時にparamsで指定した変数に該当し、リーフノードと呼ばれる。

勾配値の計算が可能な変数を意味している。

緑色のノードは出力ノード、灰色のノードは中間処理という意味になっている。

一番上と一番下のノードに記載される()は、それぞれの変数のshapeを示している。

(17)であれば、1階17次元のテンソル、()であれば、0階のスカラーということになる。

"AccumulateGrad"は、末端のリーフノードの直下に配置され、勾配値を蓄積する場所を示している。

計算グラフがあれば、PyTorch側で\(f(x) = 2x^2 + 2\)という2次関数を、指数関数、乗算関数、加算関数という基本的な関数の合成関数として認識できる。

4. 勾配計算

backward関数を呼び出すだけ。


# 4. 勾配計算
z.backward()
            
5. 勾配値の取得

print(x.grad)
            
6. 勾配値の初期化

x.gradには最新の勾配計算の結果が入るのではなく、それまでの勾配計算結果を加算した値が入る。

条件を変えて新しく勾配値を取得したい場合、勾配値の初期化が必要になる。


# 6. 勾配の初期化は関数zero_()を使う
x.grad.zero_()
            

シグモイド関数の勾配計算

シグモイド関数の定義

# シグモイド関数の定義
sigmoid = torch.nn.Sigmoid()
            
2. テンソル変数でyの値を計算

# 2. yの値の計算
y = sigmoid(x)
            
グラフ描画

plt.plot(x.data, y.data)
plt.show()
            
最終結果をスカラー化

z = y.sum()
            
3. 計算グラフの可視化

3. 計算グラフの可視化
g = make_dot(z, params={'x': x})
            
4. 勾配計算、5. 勾配値の取得

# 4. 勾配計算
z.backward()

# 5. 勾配値の確認
print(x.grad)
            
グラフに表示

# 元の関数と勾配のグラフ化

plt.plot(x.data, y.data, c='b', label='y')
plt.plot(x.data, x.grad.data, c='k', label='x.grad')
plt.legend()
plt.show()
            

Introduction to Machine Learning

問題の定義

与えられた身長から体重を予測する機械学習モデルを作る(線形回帰)。

勾配降下法

勾配降下法は、「予測計算」「損失関数」「勾配計算」「パラメータ修正」の4つのステップを繰り返すことで、予測関数の中のパラメータを最適な値に近づけることである。

①予測計算

入力テンソルXを入力とし、予測結果は出力テンソルYpに出力されるものとする。

今回は、予測関数の実体は次のような1次関数である。

\begin{align} Yp = W * X + B \end{align}

予測関数により予測値Ypを求めることを「①予測計算」と呼ぶ。

②損失計算

「教師あり学習」における学習データは通常、入力と正解値の両方を含んでいるので、正解値の列を分離して正解テンソルYとする。

予測計算の結果であるYpと正解であるYの、2つのテンソルを入力とする「損失」lossを定義する。

損失が最小になるようなWとBを見つけることが目標である。

この損失を計算する過程が「②損失計算」である。

損失関数は、予測関数の性質に応じて適したものを選ぶ。

今回は値を予測する回帰モデルであるため、YとYpの差の2乗を利用することにする。

正確には、すべてのデータ系列の差を2乗して、その平均をとった関数である「平均2乗誤差」を選択する。

③勾配計算

予測関数を構成するパラメータWとBの値を少しだけ変え、その時の損失の変化の度合(勾配)を調べる。これが「③勾配計算」である。

損失関数の最低値を目指すために、WとBをずらすベストな方向が勾配にあたる。

④パラメータ修正

勾配に小さな定数(学習率)lrをかけ、その値だけWとBを同時に減らす。この操作が「④パラメータ修正」である。

データ前処理


# サンプルデータの宣言
sampleData1 = np.array([
    [166, 58.7],
    [176.0, 75.7],
    [171.0, 62.1],
    [173.0, 70.4],
    [169.0,60.1]
])
print(sampleData1)
            

学習データを入力データxと正解データyに分割する。


# 機械学習モデルで扱うため、身長だけを抜き出した変数xと
# 体重だけを抜き出した変数yをセットする

x = sampleData1[:,0]
y = sampleData1[:,1]
            

データの散布図を表示する。


# 散布図表示で状況の確認

plt.scatter(x,  y,  c='k',  s=50)
plt.xlabel('$x$: 身長(cm) ')
plt.ylabel('$y$: 体重(kg)')
plt.title('身長と体重の関係')
plt.show()
            
データの変換

勾配降下法では、対象となる数値は絶対値が1以内に収まるような比較的小さな値の方が望ましい。

今回の学習データは身長も体重も大きな数値なので、それぞれ平均値を引くことで勾配降下法がやりやすい条件に変換する。

変換後のxとyをXとYで表す。


X = x - x.mean()
Y = y - y.mean()
            

予測計算

まず、変換後のXとYをテンソル変数に変換する。


# XとYをテンソル変数化する

X = torch.tensor(X).float()
Y = torch.tensor(Y).float()
            

次に、1次関数の係数にあたる変数Wと定数項にあたる変数Bもテンソル変数として定義する。


# 重み変数の定義
# WとBは勾配計算をするので、requires_grad=Trueとする

W = torch.tensor(1.0, requires_grad=True).float()
B = torch.tensor(1.0, requires_grad=True).float()
            

2つの変数には初期値として1.0の値を設定する。

また、この2つの変数は勾配降下法の対象となるので、requires_grad属性をTrueに設定して自動微分ができるようにする。

予測関数を定義し、予測値を計算する。


# 予測関数は一次関数

def pred(X):
    return W * X + B

# 予測値の計算

Yp =  pred(X)
            

計算グラフを表示する。


# 予測値の計算グラフ可視化

params = {'W': W, 'B': B}
g = make_dot(Yp, params=params)
display(g)
            

損失計算

損失関数はMSE(平均2乗誤差)と呼ばれる方法で計算する。


# 損失関数は誤差二乗平均

def mse(Yp, Y):
    loss = ((Yp - Y) ** 2).mean()
    return loss

# 損失計算

loss = mse(Yp, Y)
            

ここで得られた損失(loss)は、1次関数の係数Wと定数項Bの関数になっている。計算グラフで確認する。


# 損失の計算グラフ可視化

params = {'W': W, 'B': B}
g = make_dot(loss, params=params)
display(g)
            

勾配計算

backward関数を呼び出すだけ。


# 勾配計算

loss.backward()

# 勾配値確認

print(W.grad)
print(B.grad)
            

パラメータ修正

勾配計算ができたら、その値に一定の学習率lr(0.01や0.001)を掛けた結果を、もとのパラメータ値から引くのが勾配降下法の基本的な考え方である。


# 学習率の定義

lr = 0.001

#  勾配を元にパラメータ修正

W -= lr * W.grad
B -= lr * B.grad
            

ただし、このコードではエラーが発生する。

勾配計算をしている最中の変数は他に影響が及んでしまうため、勝手に値を修正できない。

この場合、with torch.no_grad()というコンテキストを設定すると、そのコンテキストの内部では一時的に計算グラフ生成機能が止まり、変数の修正が可能になる。


# 勾配を元にパラメータ修正
# with torch.no_grad() を付ける必要がある

with torch.no_grad():
    W -= lr * W.grad
    B -= lr * B.grad

    # 計算済みの勾配値をリセットする
    W.grad.zero_()
    B.grad.zero_()
            

勾配値を使ってパラメータ値を更新したあと、次の勾配計算の準備のため、zero_関数で勾配値の初期化もしている。

繰り返し計算

実際に繰り返し計算を行う。初期化のセルは以下の通り。


# 初期化

# WとBを変数として扱う
W = torch.tensor(1.0, requires_grad=True).float()
B = torch.tensor(1.0, requires_grad=True).float()

# 繰り返し回数
num_epochs = 500

# 学習率
lr = 0.001

# 記録用配列初期化
history = np.zeros((0, 2))
            

ループ処理のセルは以下の通り。


# ループ処理

for epoch in range(num_epochs):

    # 予測計算
    Yp = pred(X)

    # 損失計算
    loss = mse(Yp, Y)

    # 勾配計算
    loss.backward()

    with torch.no_grad():
        # パラメータ修正
        W -= lr * W.grad
        B -= lr * B.grad

        # 勾配値の初期化
        W.grad.zero_()
        B.grad.zero_()

    # 損失の記録
    if (epoch %10 == 0):
        item = np.array([epoch, loss.item()])
        history = np.vstack((history, item))
        print(f'epoch = {epoch}  loss = {loss:.4f}')
            

結果評価

WとBの最終的な値と、損失の開始時、終了時の値を表示する。


# パラメータの最終値
print('W = ', W.data.numpy())
print('B = ', B.data.numpy())

#損失の確認
print(f'初期状態: 損失:{history[0,1]:.4f}')
print(f'最終状態: 損失:{history[-1,1]:.4f}')
            

損失の減り方をグラフで可視化する(学習曲線)。


# 学習曲線の表示 (損失)

plt.plot(history[:,0], history[:,1], 'b')
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.show()
            

求めたWとBの値から直線の式を算出し、散布図に重ね描きする。


# xの範囲を求める(Xrange)
X_max = X.max()
X_min = X.min()
X_range = np.array((X_min, X_max))
X_range = torch.from_numpy(X_range).float()
print(X_range)

# 対応するyの予測値を求める
Y_range = pred(X_range)
print(Y_range.data)

# グラフ描画

plt.scatter(X,  Y,  c='k',  s=50)
plt.xlabel('$X$')
plt.ylabel('$Y$')
plt.plot(X_range.data, Y_range.data, lw=2, c='b')
plt.title('身長と体重の相関直線(加工後)')
plt.show()
            

最後に、平均値を引き算した(X, Y)から元の(x, y)に戻して同じ散布図表示をする。


# y座標値とx座標値の計算

x_range = X_range + x.mean()
yp_range = Y_range + y.mean()

# グラフ描画

plt.scatter(x,  y,  c='k',  s=50)
plt.xlabel('$x$')
plt.ylabel('$y$')
plt.plot(x_range, yp_range.data, lw=2, c='b')
plt.title('身長と体重の相関直線(加工前)')
plt.show()
            

最適化関数の利用

WとBのパラメータの変更は、「最適化関数」と呼ばれる関数を経由して変更するのが主流。


# 初期化

# WとBを変数として扱う
W = torch.tensor(1.0, requires_grad=True).float()
B = torch.tensor(1.0, requires_grad=True).float()

# 繰り返し回数
num_epochs = 500

# 学習率
lr = 0.001

# optimizerとしてSGD(確率的勾配降下法)を指定する
import torch.optim as optim
optimizer = optim.SGD([W, B], lr=lr)

# 記録用配列初期化
history = np.zeros((0, 2))
            

SGDというクラスのインスタンスを生成し、optimizerという変数に保存している。


# ループ処理

for epoch in range(num_epochs):

    # 予測計算
    Yp = pred(X)

    # 損失計算
    loss = mse(Yp, Y)

    # 勾配計算
    loss.backward()

    # パラメータ修正
    optimizer.step()

    #勾配値初期化
    optimizer.zero_grad()

    # 損失値の記録
    if (epoch %10 == 0):
        item = np.array([epoch, loss.item()])
        history = np.vstack((history, item))
        print(f'epoch = {epoch}  loss = {loss:.4f}')
            

最適化関数を利用してパラメータ値を間接的に変更している。

最適化関数のチューニング

最適化関数を導入することで、学習に関して色々なチューニングを簡単にできるようになる。


# 初期化

# WとBを変数として扱う
W = torch.tensor(1.0, requires_grad=True).float()
B = torch.tensor(1.0, requires_grad=True).float()

# 繰り返し回数
num_epochs = 500

# 学習率
lr = 0.001

# optimizerとしてSGD(確率的勾配降下法)を指定する
import torch.optim as optim
optimizer = optim.SGD([W, B], lr=lr, momentum=0.9)

# 記録用配列初期化
history2 = np.zeros((0, 2))
            

momentum=0.9というオプションを、最適化関数のインスタンスoptimizerの生成時に設定している。

これにより、学習の速度を速くすることができる。

Definition of the Prediction Function

PyTorchでは、予測関数を細かい機能に分け、機能の一つ一つに対応する部品を用意し、その部品を組み合わせることで、複雑な関数を作る(ビルディングブロック)。

この部品のことを、「レイヤー関数」と呼ぶ。

レイヤー関数:

テンソルを入力とし、テンソルを出力とする関数群。機械学習モデルは、レイヤー関数を部品として、これを組み合わせて作る。ReLU関数のような活性化関数もレイヤー関数の一種とする。

パラメータ:

レイヤー関数の内部でも持っている、入力テンソル以外のデータ。学習とはレイヤー関数のパラメータ値を調整することを意味する。すべてのレイヤー関数がパラメータを持つわけではない。

入力テンソル:

機械学習モデル全体を1つの関数(合成関数)とみなした場合、関数の入力となるテンソル。ニューラルネットワークの概念では、「入力層」に該当する。

出力テンソル:

機械学習もでる全体を1つの関数(合成関数)とみなした場合、関数の出力となるテンソル。ニューラルネットワークの概念では、「出力層」に該当する。

機械学習モデル:

複数のレイヤー関数を組み合わせて(1つの場合もある)、入力テンソルに対して望ましい(正解データにできるだけ近い)出力テンソルを出力する合成関数。

学習:

レイヤー関数内部のパラメータ値を、望ましい出力テンソルが得られるように調整すること。具体的な手段として、勾配降下法などの最適化関数が用いられる。

予測関数の内部構造

まず、部品となるレイヤー関数をインスタンスとして定義する。


# レイヤー関数定義

# 最初の線形関数
# 784 入力数
# 128 出力数
l1 = nn.Linear(784, 128)

# 2番目の線形関数
# 128 入力数
# 10 出力数
l2 = nn.Linear(128, 10)

# 活性化関数
relu = nn.ReLU(inplace=True)
            

2つの線形関数と、1つの活性化関数(ReLU関数)を定義している。

最初の線形関数の2つ目のパラメータ値(128)が、2番目の線形関数の最初のパラメータ値と等しい。

この128という値が、「隠れ層」のノード数に該当することになる。

この3つの関数を組み合わせて入力から出力を得る。


# 入力テンソルから出力テンソルを計算

# ダミー入力データを作成
inputs = torch.randn(100, 784)

# 中間テンソル1の計算
m1 = l1(inputs)

# 中間テンソル2の計算
m2 = relu(m1)

# 出力テンソルの計算
outputs = l2(m2)

# 入力テンソルと出力テンソルのshape確認
print('入力テンソル', inputs.shape)
print('出力テンソル', outputs.shape)
            

最初に100行、784列の2階テンソル(行列)を作っている。

機械学習では複数件のデータを同時に扱うことが原則である。

入力テンソルの最初のインデックスは常に「複数あるデータのうち何番目のデータか」を意味している。

[100, 784]というshapeは「784個の要素を持つ1階テンソル(ベクトル)のデータが100件ある」と読む。

直線状につながる合成関数は、nn.Sequentialという部品を利用してより簡単に実装できる。


# nn.Sequentialを使って、全体を合成関数として定義

net2 = nn.Sequential(
    l1,
    relu,
    l2
)

outputs2 = net2(inputs)

# 入力テンソルと出力テンソルのshape確認
print('入力テンソル', inputs.shape)
print('出力テンソル', outputs2.shape)
            

PyTorchによる機械学習プログラムは「予測関数」「損失関数」「最適化関数」の3つのパートに分けることが可能である。

繰り返し処理の順番という観点でいうと、「予測計算」「損失計算」「勾配計算」「パラメータ修正」を繰り返すという形になる。

活性化関数の目的

単に線形関数を合成しただけの関数は、結局1階層の線形関数と同じである。

「非線形関数」と呼ばれる活性化関数を線形関数の間に入れることにより初めて深い階層のディープラーニングモデルが意味を持つ。

活性化関数にはもう一つ、線形関数の出力を整形するという役割がある。

具体的には、2値分類モデルではシグモイド関数を、多値分類モデルではsoftmax関数をこの目的で利用し、モデルの出力値を0から1の値を持つ「確率値」にする。

Linear Regression

問題の定義

「ボストン・データセット」を利用する。

ボストン近郊を506の地域に分割し、それぞれの地域で様々な観点の統計情報を取得している。

その中の1項目に「住宅平均価格」があり、「他の項目から住宅平均価格を予測する」という回帰モデルの題材としてよく利用される。

まずは、入力項目のうち「平均部屋数」を意味するRMという項目を使って、目的関数にあたる不動産価格を予測する、単回帰と呼ばれるモデルを作る。

その後、もう一つ「低所得者率」を意味するLSTATという項目も追加し、2入力1出力のモデルを作る。

このモデルは、「単回帰」に対して「重回帰」と呼ばれる。両者を合わせて「線形回帰」と呼ぶ。

1次関数はレイヤー関数でいうと、線形関数(nn.Linear)に該当する。

線形回帰モデルは、他のレイヤー関数との組み合わせなしに、nn.Linearというう単独のレイヤー関数でモデルを実装できる。

線形関数

インスタンスを生成するコードは以下の通り。


# 入力:2、出力:3の線形関数の定義
l3 = nn.Linear(2, 3)
            

このレイヤー関数はインスタンス生成時、2つの引数をとる。

最初の引数は入力テンソルの次元数を、次の引数は出力テンソルの次元数を意味する。

つまり、この例だと、2次元テンソルを入力として3次元テンソルを出力とする関数になる。

1入力1出力

# 乱数の種固定
torch.manual_seed(123)

# 入力:1 出力:1 の線形関数の定義
l1 = nn.Linear(1, 1)

# 線形関数の表示
print(l1)
            

レイヤー関数には、nn.Linear以外の関数うを含め、統一的にnamed_parametersという関数が組み込まれている。

この関数を呼び出すと、(パラメータ名、パラメータ参照)のリストを返す。

変数l1内にどのようなパラメータがあり、どのような値とshapeを持っているか調べる。


# パラメータ名、パラメータ値、shapeの表示

for param in l1.named_parameters():
    print('name: ', param[0])
    print('tensor: ', param[1])
    print('shape: ', param[1].shape)
            

結果から、[1, 1]というshapeを持つ変数weightと、[1]というshapeを持つ変数biasがあることが分かる。

weightは「重み」(1次関数の係数)を、biasはバイアス(1次関数の定数項)を意味する。

今回は、入力も出力も1次元なので、weightもbiasも本来ならスカラー(0階テンソル)で問題ないはずである。

わざわざ[1, 1]という行列、[1]というベクトルになっているのは、入力・出力テンソルの次元数が2以上に増えた時も簡単に拡張できるようにするためである。

2つのパラメータでは、requires=Trueになっている。通常、レイヤー関数内のパラメータは学習対象である。

weightとbiasはともにランダムな値に設定されている。

通常乱数値がセットされるパラメータに明示的な値を設定したい場合、次のコードのように、nn.init.constant_を呼び出す(ここでは、\(y = 2x + 1\)としてみる)。


# 初期値設定
nn.init.constant_(l1.weight, 2.0)
nn.init.constant_(l1.bias, 1.0)

# 結果確認
print(l1.weight)
print(l1.bias)
            

ダミーデータをこの関数にかけて、1次関数として動作していることを確認する。


# テスト用データ生成

# x_npをnumpy配列で定義(x_np = [-2. -1. 0. 1. 2.])
x_np = np.arange(-2, 2.1, 1)

# Tensor化
x = torch.tensor(x_np).float()

# サイズを(N,1)に変更
x = x.view(-1,1)

# 結果確認
print(x.shape)
print(x)
            

weightが[1, 1]の行列であるため、入力変数xは[5, 1]の2次元テンソルに変換している。

2入力1出力

線形関数\(y = x_1 + x_2 + 2\)を作る。


# 2次元numpy配列
x2_np = np.array([[0, 0], [0, 1], [1, 0], [1,1]])

# Tensor化
x2 =  torch.tensor(x2_np).float()

# 結果確認
print(x2.shape)
print(x2)
            

以下のコードで4行2列のテストデータを用意する。


# 2次元numpy配列
x2_np = np.array([[0, 0], [0, 1], [1, 0], [1,1]])

# Tensor化
x2 =  torch.tensor(x2_np).float()

# 結果確認
print(x2.shape)
print(x2)
            

以下のように関数呼び出しを行う。


# 関数値計算
y2 = l2(x2)

# shape確認
print(y2.shape)

# 値確認
print(y2.data)
            
2入力3出力

# 入力:2 出力:3 の線形関数の定義

l3 = nn.Linear(2, 3)

# 初期値設定
nn.init.constant_(l3.weight[0,:], 1.0)
nn.init.constant_(l3.weight[1,:], 2.0)
nn.init.constant_(l3.weight[2,:], 3.0)
nn.init.constant_(l3.bias, 2.0)

# 結果確認
print(l3.weight)
print(l3.bias)
            

# 関数値計算
y3 = l3(x2)

# shape確認
print(y3.shape)

# 値確認
print(y3.data)
            

カスタムクラスを利用したモデル定義


# モデルのクラス定義

class Net(nn.Module):
    def __init__(self, n_input, n_output):
        #  親クラスnn.Modulesの初期化呼び出し
        super().__init__()

        # 出力層の定義
        self.l1 = nn.Linear(n_input, n_output)

    # 予測関数の定義 
    def forward(self, x):
        x1 = self.l1(x) # 線形回帰
        return x1
            

このコードは、機械学習モデル定義のコードの一番本質的な部分である。

Netというクラスの親クラスは、nn.Moduleである。

クラスの内部にはforward関数が定義されていて、この関数で予測処理を実装する。

予測は次のように行う。


# ダミー入力
inputs = torch.ones(100,1)

# インスタンスの生成 (1入力1出力の線形モデル)
n_input = 1
n_output = 1
net = Net(n_input, n_output)

# 予測
outputs = net(inputs)
            

カスタムクラスのインスタンス変数netは自分自身が関数として動く。

MSELossクラスを利用した損失関数

「損失loss」とは、「予測関数」と「損失関数」を組み合わせてできあがった合成関数で、「損失計算」とは、この合成関数を計算することを意味する。

この合成関数(損失)はパラメータ(weightとbias)を引数とする。

合成関数(損失)をパラメータで偏微分することが「勾配計算」であり、勾配計算の結果が勾配降下法の「パラメータ修正」で用いられる。


# 損失関数:平均2乗誤差
criterion = nn.MSELoss()
            

このコードでは、初期化処理の中で損失関数criterionを、クラスnn.MSELossのインスタンスとして定義している。


# 誤差計算
loss = criterion(outputs, labels1) / 2.0
            

このコードでは、繰り返し処理の中で、損失関数を出力テンソルoutputsと正解テンソルlabelsの2つをパラメータとして取る関数として呼び出し、さらに2.0で割った結果を損失lossに代入している。これが「損失計算」である。

また、損失lossに対してbackward関数を呼び出し、勾配計算をしている。

データ準備


# 学習用データ準備
                
data_url = "http://lib.stat.cmu.edu/datasets/boston"
raw_df = pd.read_csv(data_url, sep="\s+",
    skiprows=22, header=None)
x_org = np.hstack([raw_df.values[::2, :],
    raw_df.values[1::2, :2]])
yt = raw_df.values[1::2, 2]
feature_names = np.array(['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX',
    'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO','B', 'LSTAT'])

# 結果確認
print('元データ', x_org.shape, yt.shape)
print('項目名: ', feature_names)
            

# データ絞り込み (項目 RMのみ)
x = x_org[:,feature_names == 'RM']
print('絞り込み後', x.shape)
print(x[:5,:])

# 正解データ yの表示
print('正解データ')
print(yt[:5])
            

# 散布図の表示

plt.scatter(x, yt, s=10, c='b')
plt.xlabel('部屋数')
plt.ylabel('価格')
plt.title('部屋数と価格の散布図')
plt.show()
            

モデル定義

変数定義

# 変数定義

# 入力次元数
n_input= x.shape[1]

# 出力次元数
n_output = 1

print(f'入力次元数: {n_input}  出力次元数: {n_output}')
            

機械学習・ディープラーニングモデルとは、「入力ベクトルに対して出力ベクトルを返す関数」である。

今回のモデルは、1入力1出力のとてもシンプルなモデルである。

機械学習モデル(予測モデル)のクラス定義

# 機械学習モデル(予測モデル)クラス定義

class Net(nn.Module):
    def __init__(self, n_input, n_output):
        #  親クラスnn.Modulesの初期化呼び出し
        super().__init__()

        # 出力層の定義
        self.l1 = nn.Linear(n_input, n_output)

        # 初期値を全部1にする
        nn.init.constant_(self.l1.weight, 1.0)
        nn.init.constant_(self.l1.bias, 1.0)

    # 予測関数の定義
    def forward(self, x):
        x1 = self.l1(x) # 線形回帰
        return x1
            

PyTorchでは、モデル用のクラスの内部で必ずforward関数を定義し、入力テンソルinputsを入力として出力テンソルoutputsを出力するための処理を記述するルールになっている。

インスタンス生成

# インスタンスの生成
# 1入力1出力の線形モデル

net = Net(n_input, n_output)
            

Netクラスのインスタンスであるnet変数を定義すると、次の呼び出し方で入力テンソルから、予測値である出力テンソルを取得できる。


outputs = net(inputs)
            
モデル内の変数値表示

このように生成したモデルを表す変数netでは、親クラスのnn.Module内で定義されている便利な機能(関数)を利用可能である。

その一つとして、モデル内の変数名とその値を取得する関数named_parametersがある。


# モデル内のパラメータの確認
# モデル内の変数取得にはnamed_parameters関数を利用する
# 結果の第1要素が名前、第2要素が値
#
# predict.weightとpredict.biasがあることがわかる
# 初期値はどちらも1.0になっている

for parameter in net.named_parameters():
    print(f'変数名: {parameter[0]}')
    print(f'変数値: {parameter[1].data}')
            
parameters関数の呼び出し

もう一つの便利機能として、parameters関数もある。

parameters関数は、「パラメータ変数」だけが名前なしにリスト形式で返ってくる。

最適化関数のインスタンス生成において、最適化対象のパラメータをリストで渡すときによく使われる。


# パラメータのリスト取得にはparameters関数を利用する

for parameter in net.parameters():
    print(parameter)
            
モデルの概要表示

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

from torchinfo import summary
summary(net, (1,))
            

summaryという関数を呼び出すが、引数としてnet変数そのものと、入力変数のサイズを指定する。

損失関数と最適化関数の定義

# 損失関数: 平均2乗誤差
criterion = nn.MSELoss()

# 学習率
lr = 0.01

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)
            

勾配降下法

入力値xと正解値ytのテンソル化

# 入力変数x と正解値 ytのテンソル変数化

inputs = torch.tensor(x).float()
labels = torch.tensor(yt).float()

# 次元数確認

print(inputs.shape)
print(labels.shape)
            

正解値をテンソル変数化した変数labelsに関しては、これからMSELossのインスタンスであるcriterion関数に、予測値とともに渡して、損失を計算する。

そのときに引数はN次元のベクトル形式でなく、(N, 1)次元の行列形式であることが必要である。

そのため、view関数を利用してデータサイズを変更する。


# 損失値計算用にlabels変数を(N,1)次元の行列に変換する

labels1 = labels.view((-1, 1))

# 次元数確認
print(labels1.shape)
            

あとは①予測計算、②損失計算、③勾配計算、④パラメータ修正、を繰り返せば勾配降下法が実現できる。

①予測計算

# 予測計算

outputs = net(inputs)
            
②損失計算

#  損失計算
loss = criterion(outputs, labels1)

# 損失値の取得
print(f'{loss.item():.5f}')
            
計算グラフの可視化

# 損失の計算グラフ可視化

g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            
③勾配計算

# 予測計算
outputs = net(inputs)

# 損失計算
loss = criterion(outputs, labels1)

# 勾配計算
loss.backward()

# 勾配の結果が取得可能に
print(net.l1.weight.grad)
print(net.l1.bias.grad)
            
④パラメータ修正

# パラメータ修正
optimizer.step()

# パラメータ値が変わる
print(net.l1.weight)
print(net.l1.bias)
            

勾配値の初期化も忘れずに。


# 勾配値の初期化
optimizer.zero_grad()

# 勾配値がすべてゼロになっている
print(net.l1.weight.grad)
print(net.l1.bias.grad)
            
繰り返し計算

まず、初期化処理の部分を示す。


# 学習率
lr = 0.01

# インスタンス生成 (パラメータ値初期化)
net = Net(n_input, n_output)

# 損失関数: 平均2乗誤差
criterion = nn.MSELoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 50000

# 評価結果記録用 (損失関数値のみ記録)
history = np.zeros((0,2))
            

続いて、ループ処理の部分を示す。


# 繰り返し計算メインループ

for epoch in range(num_epochs):

    # 勾配値初期化
    optimizer.zero_grad()

    # 予測計算
    outputs = net(inputs)

    # 損失計算
    loss = criterion(outputs, labels1) / 2.0

    # 勾配計算
    loss.backward()

    # パラメータ修正
    optimizer.step()

    # 100回ごとに途中経過を記録する
    if ( epoch % 100 == 0):
        history = np.vstack((history, np.array([epoch, loss.item()])))
        print(f'Epoch {epoch} loss: {loss.item():.5f}')
            

結果確認


# 損失初期値と最終値

print(f'損失初期値: {history[0,1]:.5f}')
print(f'損失最終値: {history[-1,1]:.5f}')
            

# 学習曲線の表示 (損失)
# 最初の1つを除く

plt.plot(history[1:,0], history[1:,1], 'b')
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.show()
            

# 回帰直線の算出

# xの最小値、最大値
xse = np.array((x.min(), x.max())).reshape(-1,1)
Xse = torch.tensor(xse).float()

with torch.no_grad():
  Yse = net(Xse)

print(Yse.numpy())
            

# 散布図と回帰直線の描画

plt.scatter(x, yt, s=10, c='b')
plt.xlabel('部屋数')
plt.ylabel('価格')
plt.plot(Xse.data, Yse.data, c='k')
plt.title('散布図と回帰直線')
plt.show()
            

重回帰モデルへの拡張

これまでは、入力変数が平均部屋数(RM)の1項目だけだったが、新しくLSTATを追加して2入力とする。

新しい入力変数をx2とする。


# 列(LSTAT: 低所得者率)の追加

x_add = x_org[:,feature_names == 'LSTAT']
x2 = np.hstack((x, x_add))

# shapeの表示
print(x2.shape)

# 入力データxの表示
print(x2[:5,:])
            

入力データの次元数n_inputが2になるため、モデルインスタンスを生成し直す。


# 今度は入力次元数=2

n_input = x2.shape[1]
print(n_input)

# モデルインスタンスの生成
net = Net(n_input, n_output)
            

インスタンス変数net内のパラメータがどう変わったか、いくつかの方法で確認する。


# モデル内のパラメータの確認
# predict.weight が2次元に変わった

for parameter in net.named_parameters():
    print(f'変数名: {parameter[0]}')
    print(f'変数値: {parameter[1].data}')
            

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

from torchinfo import summary
summary(net, (2,))
            

param # が3となっているのは、weightが2個とbiasが1個という意味。

新しい入力変数x2を改めてテンソルinputsに定義し直す。


# 入力変数x2 のテンソル変数化
# labels, labels1は前のものをそのまま利用

inputs = torch.tensor(x2).float()
            

繰り返し計算は以下の通り。


# 初期化処理

# 学習率
lr = 0.01

# インスタンス生成 (パラメータ値初期化)
net = Net(n_input, n_output)

# 損失関数: 平均2乗誤差
criterion = nn.MSELoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 50000

# 評価結果記録用 (損失関数値のみ記録)
history = np.zeros((0,2))
            

# 繰り返し計算メインループ

for epoch in range(num_epochs):

    # 勾配値初期化
    optimizer.zero_grad()

    # 予測計算
    outputs = net(inputs)

    # 誤差計算
    loss = criterion(outputs, labels1) / 2.0

    # 勾配計算
    loss.backward()

    # パラメータ修正
    optimizer.step()

    # 100回ごとに途中経過を記録する
    if ( epoch % 100 == 0):
        history = np.vstack((history, np.array([epoch, loss.item()])))
        print(f'Epoch {epoch} loss: {loss.item():.5f}')
            

損失計算の値がinf→nanになってしまうので、学習率の値を再設定してやり直す。

学習率の変更


# 繰り返し回数
#num_epochs = 50000
num_epochs = 2000

# 学習率
#l r = 0.01
lr = 0.001

# モデルインスタンスの生成
net = Net(n_input, n_output)

# 損失関数: 平均2乗誤差
criterion = nn.MSELoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)
            

# 繰り返し計算メインループ

# 評価結果記録用 (損失関数値のみ記録)
history = np.zeros((0,2))

for epoch in range(num_epochs):

    # 勾配値初期化
    optimizer.zero_grad()

    # 予測計算
    outputs = net(inputs)

    # 誤差計算
    loss = criterion(outputs, labels1) / 2.0

    #勾配計算
    loss.backward()

    # パラメータ修正
    optimizer.step()

    # 100回ごとに途中経過を記録する
    if ( epoch % 100 == 0):
        history = np.vstack((history, np.array([epoch, loss.item()])))
        print(f'Epoch {epoch} loss: {loss.item():.5f}')
            

# 損失初期値、最終値

print(f'損失初期値: {history[0,1]:.5f}')
print(f'損失最終値: {history[-1,1]:.5f}')
            

# 学習曲線の表示 (損失)

plt.plot(history[:,0], history[:,1], 'b')
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.show()
            

Binary Classification

問題の定義

「アイリス・データセット」を使用する。

このデータセットは、Setosa、Versicolour、Virgincaという3種類のアヤメの花に対して、「花弁」と「がく片」のそれぞれの「長さ」と「幅」を測定した結果である。

4種類のサイズ(長さまたは幅)を入力に、花の種類を予測するモデルを作るための学習データとしてよく利用される。

まずは2種類の花だけを対象に、入力項目も2つだけに絞りこむ。

2値ロジスティック回帰モデルを作る。

「分類」モデルは、「精度」という指標値を導入することにより、「回帰」モデルよりもモデルの性能を判断しやすいという特徴がある。

精度(Accuracy)

(正解件数)/(全体件数)によって、モデルがどの程度の比率で正しく予測できているかを数値化する。

訓練データと検証データの分割

過学習をチェックするためには、検証データを事前に準備しておくことが重要。

シグモイド関数


# NumPy配列でxデータを定義
x_np = np.arange(-4, 4.1, 0.25)

# データをTensor形式に変換
x = torch.tensor(x_np).float()

# yの値を計算
y = torch.sigmoid(x)

# グラフ描画
plt.title('シグモイド関数のグラフ')
plt.plot(x.data, y.data)
plt.show()
            

シグモイド関数は、数式では次のように表される。

\begin{align} f(x) = \frac{1}{1 + \exp(-x)} \end{align}

次の特徴を持つ。

この性質は、関数値を「確率」として解釈して利用するのに適している。

この性質を利用することで2値分類モデルの予測の仕組みが実装される。

入力ベクトルxをnn.Linearで実装される線形関数にかけて結果を得るところまでは、線形回帰モデルとまったく同じである。

唯一異なるのは、線形関数の出力uに対してシグモイド関数をかけて、確率値を取得するところである。

交差エントロピー関数

線形回帰では2次関数だった損失関数は、分類モデルでは交差エントロピー関数になる。

シグモイド関数の出力として得られる確率値は、厳密には「入力データに対して分類結果が1になる確率」である。

2値分類の場合、正解値は1か0のいずれかなので、「分類結果が1になる確率」がf(u)なら、「分類結果が0になる確率」は1-f(u)で表されることになる。

ここで、損失関数に最尤推定という考えを導入する。

「すべてのデータに対する確信度(分類結果が正解の確率)を掛け合わせた結果を最大にするパラメータが一番もっともらしいので採用する」という考えである。

対数尤度関数は次の通りである。

\begin{align} \sum_{i} {yt_i \cdot \log(f(u_i)) + (1 - u_i) \log(1 - f(u_i))} \end{align}

損失関数は、できるだけ小さくすることが目標なので、この式にマイナスをかけ、さらにデータ件数で割って平均を取ったものが、交差エントロピー関数と呼ばれる、2値分類モデルで損失関数として利用される関数になる。

PyTorchで2値分類用の交差エントロピー関数を利用する場合、nn.BCELossというクラスを利用する。

データ準備

データ読み込み

# 学習用データ準備

# ライブラリのインポート
from sklearn.datasets import load_iris

# データ読み込み
iris = load_iris()

# 入力データと正解データ取得
x_org, y_org = iris.data, iris.target

# 結果確認
print('元データ', x_org.shape, y_org.shape)
            
データ絞り込み

次にデータを絞り込む。絞り込みは行方向と列方向(xのみ対象)に対して行う。

アイリス・データセットは元々150行のデータがあるが、これを先頭から100行に限定すると、正解値がSetosa、Versicolourのみのデータになる。

xの列方向も最初の2列のみ(がく片sepalの長さと幅)に絞り込む。


# データ絞り込み
#   クラス0, 1のみ
#   項目sepal_lengthとsepal_widthのみ

x_data = iris.data[:100,:2]
y_data = iris.target[:100]

# 結果確認
print('対象データ', x_data.shape, y_data.shape)
            
訓練データと検証データへの分割

#  元データのサイズ
print(x_data.shape, y_data.shape)

# 訓練データ、検証データに分割 (シャフルも同時に実施)
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(
    x_data, y_data, train_size=70, test_size=30,
    random_state=123)
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)
            

順番を変えずに分割すると、データに偏りが出てしまいがちだが、同時にシャッフルもしているので問題ない。

また、random_stateパラメータを指定すると、シャッフルする際の乱数の種も固定になるので、分割結果も常に同じになる。

散布図表示

# 散布図の表示

x_t0 = x_train[y_train == 0]
x_t1 = x_train[y_train == 1]
plt.scatter(x_t0[:,0], x_t0[:,1], marker='x', c='b', label='0 (setosa)')
plt.scatter(x_t1[:,0], x_t1[:,1], marker='o', c='k', label='1 (versicolor)')
plt.xlabel('sepal_length')
plt.ylabel('sepal_width')
plt.legend()
plt.show()
            

モデル定義

まずは、入力次元数と出力次元数の定義を行う。


# 入力次元数 (今の場合2)
n_input= x_train.shape[1]

# 出力次元数
n_output = 1

# 結果確認
print(f'n_input: {n_input}  n_output:{n_output}')
            

以下でモデル定義を行う。


# モデルの定義
# 2入力1出力のロジスティック回帰モデル

class Net(nn.Module):
    def __init__(self, n_input, n_output):
        super().__init__()
        self.l1 = nn.Linear(n_input, n_output)
        self.sigmoid = nn.Sigmoid()

        # 初期値を全部1にする
        self.l1.weight.data.fill_(1.0)
        self.l1.bias.data.fill_(1.0)

    # 予測関数の定義
    def forward(self, x):
        # 最初に入力値を線形関数にかけたを計算する
        x1 = self.l1(x)
        # 計算結果にシグモイド関数をかける
        x2 = self.sigmoid(x1)
        return x2
            

次に、インスタンス生成、モデル内のパラメータ確認を行う。


# インスタンスの生成

net = Net(n_input, n_output)
            

# モデル内のパラメータの確認
# l1.weightとl1.biasがあることがわかる

for parameter in net.named_parameters():
    print(parameter)
            

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

summary(net, (2,))
            
最適化アルゴリズムと損失関数の定義

# 損失関数: 交差エントロピー関数
criterion = nn.BCELoss()

# 学習率
lr = 0.01

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)
            

勾配降下法

入力データと正解データのテンソル化

訓練に使うinputsとlabelsだけでなく、検証に使うinputs_testとlabels_testも精度評価用に準備しておく。

損失関数BCELossを使用する場合、正解データとしての第2引数は、第1引数とshapeがそろっている必要がある。

そのため、BCELoss用の変数としてlabels1といlabels_testも用意しておく。


# 入力データ x_train と正解データ y_train のテンソル化

inputs = torch.tensor(x_train).float()
labels = torch.tensor(y_train).float()

# 正解データはN行1列の行列に変換する
labels1 = labels.view((-1,1))

# 検証データのテンソル化
inputs_test = torch.tensor(x_test).float()
labels_test = torch.tensor(y_test).float()

# 検証用の正解データもN行1列の行列に変換する
labels1_test = labels_test.view((-1,1))
            

# 予測計算
outputs = net(inputs)

# 損失計算
loss = criterion(outputs, labels1)

# 損失の計算グラフ可視化
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            
初期化処理

# 学習率
lr = 0.01

# 初期化
net = Net(n_input, n_output)

# 損失関数: 交差エントロピー関数
criterion = nn.BCELoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 10000

# 記録用リストの初期化
history = np.zeros((0,5))
            

history変数が5列なのは、以下の情報を含めるからである。

  1. 繰り返し数
  2. 訓練データの損失
  3. 訓練データの精度
  4. 検証データの損失
  5. 検証データの精度
メインループ

# 繰り返し計算メインループ

for epoch in range(num_epochs):
    # 訓練フェーズ

    #勾配値初期化
    optimizer.zero_grad()

    # 予測計算
    outputs = net(inputs)

    # 損失計算
    loss = criterion(outputs, labels1)

    # 勾配計算
    loss.backward()

    # パラメータ修正
    optimizer.step()

    # 損失の保存(スカラー値の取得)
    train_loss = loss.item()

    # 予測ラベル(1 or 0)計算
    predicted = torch.where(outputs < 0.5, 0, 1)

    # 精度計算
    train_acc = (predicted == labels1).sum() / len(y_train)

    # 予測フェーズ

    # 予測計算
    outputs_test = net(inputs_test)

    # 損失計算
    loss_test = criterion(outputs_test, labels1_test)

    # 損失の保存(スカラー値の取得)
    val_loss =  loss_test.item()

    # 予測ラベル(1 or 0)計算
    predicted_test = torch.where(outputs_test < 0.5, 0, 1)

    # 精度計算
    val_acc = (predicted_test == labels1_test).sum() / len(y_test)

    if ( epoch % 10 == 0):
        print (f'Epoch [{epoch}/{num_epochs}], loss: {train_loss:.5f} acc: {train_acc:.5f} val_loss: {val_loss:.5f}, val_acc: {val_acc:.5f}')
        item = np.array([epoch, train_loss, train_acc, val_loss, val_acc])
        history = np.vstack((history, item))
            

train_loss = loss.item()では、損失をスカラー化してtrain_lossに保存している(item関数を使うと、テンソルから数値を取り出せる)。

結果確認

まず、損失と精度を確認する。


#損失と精度の確認

print(f'初期状態: 損失: {history[0,3]:.5f} 精度: {history[0,4]:.5f}' )
print(f'最終状態: 損失: {history[-1,3]:.5f} 精度: {history[-1,4]:.5f}' )
            

次に、学習曲線を可視化する。


# 学習曲線の表示 (損失)

plt.plot(history[:,0], history[:,1], 'b', label='訓練')
plt.plot(history[:,0], history[:,3], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.legend()
plt.show()
            

# 学習曲線の表示 (精度)

plt.plot(history[:,0], history[:,2], 'b', label='訓練')
plt.plot(history[:,0], history[:,4], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('精度')
plt.title('学習曲線(精度)')
plt.legend()
plt.show()
            

今回のロジスティック回帰モデルでは、2つの分類結果の境界になる直線が存在し、その直線のことを決定境界と呼ぶ。


# 検証データを散布図用に準備

x_t0 = x_test[y_test==0]
x_t1 = x_test[y_test==1]
            

# パラメータの取得

bias = net.l1.bias.data.numpy()
weight = net.l1.weight.data.numpy()
print(f'BIAS = {bias}, WEIGHT = {weight}')

# 決定境界描画用 x1の値から x2の値を計算する
def decision(x):
    return(-(bias + weight[0,0] * x)/ weight[0,1])

# 散布図のx1の最小値と最大値
xl = np.array([x_test[:,0].min(), x_test[:,0].max()])
yl = decision(xl)

# 結果確認
print(f'xl = {xl}  yl = {yl}')
            

# 散布図表示
plt.scatter(x_t0[:,0], x_t0[:,1], marker='x',
        c='b', s=50, label='class 0')
plt.scatter(x_t1[:,0], x_t1[:,1], marker='o',
        c='k', s=50, label='class 1')

# 決定境界直線
plt.plot(xl, yl, c='b')
plt.xlabel('sepal_length')
plt.ylabel('sepal_width')
plt.legend()
plt.show()
            

BCEWithLogitsLoss関数

BCELossとよく似た関数として、BCEWithLogitsLossがある。


# モデルの定義
# 2入力1出力のロジスティック回帰モデル

class Net(nn.Module):
    def __init__(self, n_input, n_output):
        super().__init__()
        self.l1 = nn.Linear(n_input, n_output)

        # 初期値を全部1にする
        # 「ディープラーニングの数学」と条件を合わせる目的
        self.l1.weight.data.fill_(1.0)
        self.l1.bias.data.fill_(1.0)

    # 予測関数の定義
    def forward(self, x):
        # 入力値と行列の積を計算する
        x1 = self.l1(x)
        return x1
            

# 学習率
lr = 0.01

# 初期化
net = Net(n_input, n_output)

# 損失関数: logits付き交差エントロピー関数
criterion = nn.BCEWithLogitsLoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 10000

# 記録用リストの初期化
history = np.zeros((0,5))
            

# 繰り返し計算メインループ

for epoch in range(num_epochs):
    # 訓練フェーズ

    #勾配値初期化
    optimizer.zero_grad()

    # 予測計算
    outputs = net(inputs)

    # 損失計算
    loss = criterion(outputs, labels1)

    # 勾配計算
    loss.backward()

    # パラメータ修正
    optimizer.step()

    # 損失のスカラー化
    train_loss = loss.item()

    # 予測ラベル(1 or 0)計算
    predicted = torch.where(outputs < 0.0, 0, 1)

    # 精度計算
    train_acc = (predicted == labels1).sum() / len(y_train)

    # 予測フェーズ

    # 予測計算
    outputs_test = net(inputs_test)

    # 損失計算
    loss_test = criterion(outputs_test, labels1_test)

    # 損失のスカラー化
    val_loss =  loss_test.item()

    #予測ラベル(1 or 0)計算
    predicted_test = torch.where(outputs_test < 0.0, 0, 1)

    # 精度計算
    val_acc = (predicted_test == labels1_test).sum() / len(y_test)

    if ( epoch % 10 == 0):
        print (f'Epoch [{epoch}/{num_epochs}], loss: {train_loss:.5f} acc: {train_acc:.5f} val_loss: {val_loss:.5f}, val_acc: {val_acc:.5f}')
        item = np.array([epoch, train_loss, train_acc, val_loss, val_acc])
        history = np.vstack((history, item))
            

BCELWithLogitsLossは、シグモイド関数の後で、交差エントロピー関数を呼び出す。

BCELossを使う場合と異なり、モデルの出力がどちらのクラスに属しているかの基準は0.5より大きいかどうかでなく、0より大きいかどうかになる。

「指数関数(シグモイド関数などに含まれる)と対数関数(交差エントロピー関数などに含まれる)を独立して計算すると結果が不安定になりやすいから、極力セットで計算すべき」というポリシーがPyTorchにはある。

Multiclass Classification

問題の定義

ここでも、「アイリス・データセット」を利用する。

3種類すべてのアヤメの学習データを利用する。

入力項目は、最初は2個に絞り込む。

多値分類モデルでは、分類先グループ数をNとしたときに、N次元出力になる。

複数の分類器

次の考え方を用いる。

softmax関数

softmax関数は、数式では以下のように表される。

\begin{align} y_i = \frac{\exp(x_i)}{\sum_{k=1}^{n} \exp(x_k)} \quad (i = 1, 2, \cdots, n) \end{align}

シグモイド関数同様に、出力を確率値として扱える性質を持っている。

softmax関数は、入力値として一番大きな項目の確率値が一番高くなる(さらに全確率値を足すと1になる)。

しかし、他の項目の確率値も完全にゼロになるわけではなく、ある程度の値が残る。

単純に最大値を調べる関数なら、一番大きな値の項目が確率値を全取り(確率値=1)して、残りの項目の確率値はゼロになる。

そこまでしないソフトな最大値関数というのが、この関数名の由来になっている。

交差エントロピー関数

2値分類のときと同様に、損失関数には、交差エントロピー関数を利用する。

しかし、名前が同じでも、数式が多少異なる。

\begin{align} \sum_{i=0}^{N-1} (yt_i \log(yp_i)) \end{align}

ここで、\(yt_i\)は、正解のときに1、正解でないときに0の値をとる。

交差エントロピー関数は、まずsoftmax関数の出力である中間テンソル\(x2\)のすべての要素に対して対数計算をする。

その後で、正解値の要素だけを抽出する。

「正解値の要素だけ抽出」という操作をするには、正解値が、(0, 1, 2)のどれかという整数値で与えられる必要がある。

PyTorchの交差エントロピー関数は、このような操作をする目的で、損失関数に渡す正解値(第2引数)は整数値から構成される値である必要がある。

予測関数と損失関数の関係

予測関数側では活性化関数は不要で、線形関数の出力をそのまま出力とする。

予測関数出力から確率値を得たい場合は、予測関数出力にsoftmax関数をかける。

損失関数はCrossEntropyLoss関数を利用する。

データ準備


# 学習用データ準備

# ライブラリのインポート
from sklearn.datasets import load_iris

# データ読み込み
iris = load_iris()

# 入力データと正解データ取得
x_org, y_org = iris.data, iris.target

# 結果確認
print('元データ', x_org.shape, y_org.shape)
            
データ絞り込み

# データ絞り込み

# 入力データに関しては、sepal length(0)とpetal length(2)のみ抽出
x_select = x_org[:,[0,2]]

# 結果確認
print('元データ', x_select.shape, y_org.shape)
            
訓練データと検証データの分割

# 訓練データ、検証データに分割 (シャフルも同時に実施)

from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(
    x_select, y_org, train_size=75, test_size=75,
    random_state=123)
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)
            
散布図の表示

# データを正解値ごとに分割

x_t0 = x_train[y_train == 0]
x_t1 = x_train[y_train == 1]
x_t2 = x_train[y_train == 2]
            

# 散布図の表示

plt.scatter(x_t0[:,0], x_t0[:,1], marker='x', c='k', s=50, label='0 (setosa)')
plt.scatter(x_t1[:,0], x_t1[:,1], marker='o', c='b', s=50, label='1 (versicolour)')
plt.scatter(x_t2[:,0], x_t2[:,1], marker='+', c='k', s=50, label='2 (virginica)')
plt.xlabel('sepal_length')
plt.ylabel('petal_length')
plt.legend()
plt.show()
            

モデルの定義

入力次元数と出力次元数の確認

# 学習用パラメータ設定

# 入力次元数
n_input = x_train.shape[1]

# 出力次元数
# 分類先クラス数 今回は3になる
n_output = len(list(set(y_train)))

# 結果確認
print(f'n_input: {n_input}  n_output: {n_output}')
            
モデル定義

# モデルの定義
# 2入力3出力のロジスティック回帰モデル

class Net(nn.Module):
    def __init__(self, n_input, n_output):
        super().__init__()
        self.l1 = nn.Linear(n_input, n_output)

        # 初期値を全部1にする
        self.l1.weight.data.fill_(1.0)
        self.l1.bias.data.fill_(1.0)

    def forward(self, x):
        x1 = self.l1(x)
        return x1

# インスタンスの生成
net = Net(n_input, n_output)
            
モデル内のパラメータ確認

# モデル内のパラメータの確認
# l1.weightが行列にl1.biasがベクトルになっている

for parameter in net.named_parameters():
    print(parameter)
            

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

summary(net, (2,))
            
最適化アルゴリズムと損失関数定義

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 学習率
lr = 0.01

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)
            

損失関数定義にnn.CrossEntropyLossクラスを利用している。

このクラスで生成した損失関数は、「softmax関数」「対数関数」「正解値の抽出」の3つを全部まとめてやってしまう関数となっている。

勾配降下法

データのテンソル変数化

# 入力変数x_trainと正解値 y_trainのテンソル変数化

inputs = torch.tensor(x_train).float()
labels = torch.tensor(y_train).long()

# 検証用変数のテンソル変数化

inputs_test = torch.tensor(x_test).float()
labels_test = torch.tensor(y_test).long()
            

loss = criterion(outputs, labels)において、第2引数は整数でないといけないため、long関数を呼び出している。

損失の計算グラフ可視化

# 予測計算
outputs = net(inputs)

#  損失計算
loss = criterion(outputs, labels)

# 損失の計算グラフ可視化
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            
予測ラベルの取得方法

softmax関数は入力の段階で最大であった項目が出力後も最大になる性質を持っている。

そのため、softmax関数の前の状態で最大の値を持つ項目を見つければ、それが予測値ラベルになる。


# torch.max関数呼び出し
# 2つめの引数は軸を意味している。1だと行ごとの集計

print(torch.max(outputs, 1))
            

torch.max関数は、最大値そのものと最大値をとったインデックスの2つを同時に返す仕様になっている。

ラベル値を取得したい場合は、2つ目のindicesをとってくればよい。


# ラベル値の配列を取得
torch.max(outputs, 1)[1]
            
繰り返し処理

# 学習率
lr = 0.01

# 初期化
net = Net(n_input, n_output)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 10000

# 評価結果記録用
history = np.zeros((0,5))
            

# 繰り返し計算メインループ

for epoch in range(num_epochs):

    # 訓練フェーズ

    #勾配の初期化
    optimizer.zero_grad()

    # 予測計算
    outputs = net(inputs)

    # 損失計算
    loss = criterion(outputs, labels)

    # 勾配計算
    loss.backward()

    # パラメータ修正
    optimizer.step()

    # 予測ラベル算出
    predicted = torch.max(outputs, 1)[1]

    # 損失と精度の計算
    train_loss = loss.item()
    train_acc = (predicted == labels).sum()  / len(labels)

    #予測フェーズ

    # 予測計算
    outputs_test = net(inputs_test)

    # 損失計算
    loss_test = criterion(outputs_test, labels_test)

    # 予測ラベル算出
    predicted_test = torch.max(outputs_test, 1)[1]

    # 損失と精度の計算
    val_loss =  loss_test.item()
    val_acc =  (predicted_test == labels_test).sum() / len(labels_test)

    if ((epoch) % 10 == 0):
        print (f'Epoch [{epoch}/{num_epochs}], loss: {train_loss:.5f} acc: {train_acc:.5f} val_loss: {val_loss:.5f}, val_acc: {val_acc:.5f}')
        item = np.array([epoch, train_loss, train_acc, val_loss, val_acc])
        history = np.vstack((history, item))
            

結果確認


#損失と精度の確認

print(f'初期状態: 損失: {history[0,3]:.5f} 精度: {history[0,4]:.5f}' )
print(f'最終状態: 損失: {history[-1,3]:.5f} 精度: {history[-1,4]:.5f}' )
            

# 学習曲線の表示 (損失)

plt.plot(history[:,0], history[:,1], 'b', label='訓練')
plt.plot(history[:,0], history[:,3], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.legend()
plt.show()
            

# 学習曲線の表示 (精度)

plt.plot(history[:,0], history[:,2], 'b', label='訓練')
plt.plot(history[:,0], history[:,4], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('精度')
plt.title('学習曲線(精度)')
plt.legend()
plt.show()
            

MNIST

問題の定義

MNISTを使用する。

様々なパターンの手書き数字が、訓練用に6万枚、検証用に1万枚用意されていて、ディープラーニングの実習によく用いられる。

画像データは縦、横それぞれ28画素ある。画素ごとの色の濃淡は、0から255までの整数値で表される。

ただし、実習で利用するPyTorchのライブラリ経由で取得する場合は、[0, 1]の範囲の浮動小数点表現になっている。

PyTorchで取得する場合、このデータは[1, 28, 28]の3階テンソルになっている。最初の1はカラー画像を意識した色次元を意味している。

この形式の画像データは、CNNであれば、このままの状態で機械学習モデルに入力できる。

しかし、今回扱う「全結合型ニューラルネットワーク」の場合、入力は1階テンソル(ベクトル)形式になっていることが前提になる。

そこで、今回は入力データに加工を施すことで、784(28×28)要素の1次元配列に展開した形式のデータを入力とする。

数字は全部で10種類あるので、分類先のクラス数は10個である。

今回作るモデルは、入力784次元、出力10次元の隠れ層ありニューラルネットワークである。

GPUの利用

GPUの存在チェック

# デバイスの割り当て
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
            
GPU利用のルール

PyTorchでGPUを利用する場合のルールは以下の通り。

  1. テンソル変数はデータがCPU / GPU上のどちらにあるのかを属性として持っている。
  2. CPUとGPU間でデータはto関数で転送する。
  3. 2つの変数が両方ともGPU上にある場合、演算はGPUで行われる。
  4. 変数の片方がCPU、もう一方がGPUの場合、演算はエラーになる。

データ前処理

PyTorchでは、torchvision.transformersというライブラリに、前処理に便利な部品がそろっていて、この部品を組み合わせて使うだけで、簡単に望む形式のデータへ変換できる。

ミニバッチ学習法

事前に決めた数でグループを作り、このグループ単位で勾配計算をする方法。グループのメンバーを選ぶ際に乱数を使うため、繰り返し処理ごとに別のグループができる。

こうすることで、勾配降下法の計算結果が局所最適化にとどまってしまうことを避けられる。

元の学習データ全件でまとめて勾配計算する方法を「バッチ学習法」と呼ぶのに対して、この学習法は「ミニバッチ学習法」と呼ばれる。

PyTorchでは、簡単にこのミニバッチ学習法が使えるデータローダーという仕組みが用意されている。

データセットによる読み込み

PyTorchでは、データ準備のための道具立てが行き届いている。

データ取得

# ライブラリインポート
import torchvision.datasets as datasets

# ダウンロード先ディレクトリ名
data_root = './data'

train_set0 = datasets.MNIST(
    # 元データダウンロード先の指定
    root = data_root,
    # 訓練データか検証データか
    train = True,
    # 元データがない場合にダウンロードするか
    download = True)
            

データセットクラスのインスタンスの1つであるdatasets.MNISTを利用して、train_set0という変数に読み込んでいる。

データセットクラスを使って読み込んだデータは、Pythonで手軽に扱えるようになっている。

以下のコードで、データがどのようなファイルとしてダウンロードされたかを確認する。


# ダウンロードしたファイルの確認

!ls -lR ./data/MNIST
            

読み込んだデータセットであるtrain_set0は、Pythonのリストとして、(入力データ、正解データ)のセットを順に取得できる。

以下のコードは、データセットの最初の要素を入力データimageと正解データlabelに代入し、それぞれの型をtype関数で調べたものである。


# データ件数の確認
print('データ件数: ', len(train_set0))

# 最初の要素の取得
image, label = train_set0[0]

# データ型の確認
print('入力データの型: ', type(image))
print('正解データの型: ', type(label))
            

データの内容を確認する。


# 入力データの画像表示

plt.figure(figsize=(2,3))
plt.title(f'{label}')
plt.imshow(image, cmap='gray_r')
plt.axis('off')
plt.show()
            

最初の20個のデータを正解データ付きで表示してみる。


# 正解データ付きで、最初の20個をイメージ表示

plt.figure(figsize=(10, 3))
for i in range(20):
    ax = plt.subplot(2, 10, i + 1)

    # image と labelの取得
    image, label = train_set0[i]

    # イメージ表示
    plt.imshow(image, cmap='gray_r')
    ax.set_title(f'{label}')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()
            

Transformsによるデータ前処理

ToTensorの利用

入力データの形式をPyTorchで扱えるテンソル形式に変換する機能。


# ライブラリインポート
import torchvision.transforms as transforms

transform1 = transforms.Compose([
    # データのTensor化
    transforms.ToTensor(),
])

train_set1 = datasets.MNIST(
    root=data_root,  train=True,  download=True,
    transform = transform1)
            

transform1というインスタンスを定義し、その中でToTensorクラスを呼び出している。

さらに、このtransform1をtrain_setのtransformオプションとして指定している。

変換後のデータセットのうち入力データ側がどうなったかを調べる。


# 変換結果の確認

image, label = train_set1[0]
print('入力データの型: ', type(image))
print('入力データのshape: ', image.shape)
print('最小値: ', image.data.min())
print('最大値: ', image.data.max())
            
Normalizeの利用

データを正規化する。今回のデータは既に値の範囲が[0, 1]になるように正規化されているが、データ範囲を[-1, 1]に変更する。

Normalize(\(\mu, \sigma\))により、元のデータ\(x\)は\(X = \frac{x - \mu}{\sigma}\)に変換される。

\(\mu = \sigma = 0.5\)とすると、[0, 1]の範囲の値\(x\)を[-1, 1]の範囲の値\(X\)に変換できる。


transform2 = transforms.Compose([
    # データのTensor化
    transforms.ToTensor(),

    # データの正規化
    transforms.Normalize(0.5,  0.5),
])

train_set2 = datasets.MNIST(
    root = data_root,  train = True,  download = True,
    transform = transform2)
            

Composeというクラスに複数の部品をリスト形式で渡すと、それぞれの処理を順次行うtransformを作ることができる。

以下のコードで結果を確認する。


# 変換結果の確認

image, label = train_set2[0]
print('shape: ', image.shape)
print('最小値: ', image.data.min())
print('最大値: ', image.data.max())
            
Lambdaクラスを利用して1次元化

全結合ニューラルネットワークの入力にできるよう、入力変数のshapeを元の[1, 28, 28]から[784]に変更する。


transform3 = transforms.Compose([
    # データのTensor化
    transforms.ToTensor(),

    # データの正規化
    transforms.Normalize(0.5, 0.5),

    # Tensorの1階テンソル化
    transforms.Lambda(lambda x: x.view(-1)),
])

train_set3 = datasets.MNIST(
    root = data_root,  train = True,
    download=True, transform = transform3)
            

結果を確認する。


# 変換結果の確認

image, label = train_set3[0]
print('shape: ', image.shape)
print('最小値: ', image.data.min())
print('最大値: ', image.data.max())
            
最終的な実装

最終的なTransformsとデータセットの定義は以下の通り。


# データ変換用関数 Transforms
# (1) Imageをテンソル化
# (2) [0, 1]の範囲の値を[-1, 1]の範囲にする
# (3) データのshapeを[1, 28, 28]から[784]に変換

transform = transforms.Compose([
    # (1) データのテンソル化
    transforms.ToTensor(),

    # (2) データの正規化
    transforms.Normalize(0.5, 0.5),

    # (3) 1階テンソルに変換
    transforms.Lambda(lambda x: x.view(-1)),
])
            

# データ取得用関数 Dataset

# 訓練用データセットの定義
train_set = datasets.MNIST(
    root = data_root, train = True,
    download = True, transform = transform)

# 検証データセットの定義
test_set = datasets.MNIST(
    root = data_root, train = False,
    download = True, transform = transform)
            

train_setとtest_setという2種類のデータセットを定義している。

インスタンス生成時のオプションであるtrain=Falseをつけると、検証データのデータセットを読みだせる。

データローダーによるミニバッチ用データ生成


# ライブラリインポート
from torch.utils.data import DataLoader

# ミニバッチのサイズ指定
batch_size = 500

# 訓練用データローダー
# 訓練用なので、シャッフルをかける
train_loader = DataLoader(
    train_set, batch_size = batch_size,
    shuffle = True)

# 検証用データローダー
# 検証時にシャッフルは不要
test_loader = DataLoader(
    test_set,  batch_size = batch_size,
    shuffle = False)
            

定義したデータローダーはfor inputs, labels in train_loader:のような形でループを定義すると、今回の例でいうと、6万件の学習データをbatch_size(ここでは500)分の小さなグループに分けてくれる。


# 何組のデータが取得できるか
print(len(train_loader))

# DataLoaderから最初の1セットを取得する
for images, labels in train_loader:
    break

print(images.shape)
print(labels.shape)
            

ここで取得したimagesとlabelsに対して、その先頭20個の画像と正解データを表示する。


# イメージ表示
plt.figure(figsize=(10, 3))
for i in range(20):
    ax = plt.subplot(2, 10, i + 1)

    # numpyに変換
    image = images[i].numpy()
    label = labels[i]

    # imgの範囲を[0, 1]に戻す
    image2 = (image + 1)/ 2
    # イメージ表示
    plt.imshow(image2.reshape(28, 28),cmap='gray_r')
    ax.set_title(f'{label}')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()
            

モデル定義


# 入力次元数
n_input = image.shape[0]

# 出力次元数
# 分類先クラス数 今回は10になる
n_output = 10

#   隠れ層のノード数
n_hidden = 128

# 結果確認
print(f'n_input: {n_input}  n_hidden: {n_hidden} n_output: {n_output}')
            

# モデルの定義
# 784入力10出力1隠れ層のニューラルネットワークモデル

class Net(nn.Module):
    def __init__(self, n_input, n_output, n_hidden):
        super().__init__()

        # 隠れ層の定義 (隠れ層のノード数: n_hidden)
        self.l1 = nn.Linear(n_input, n_hidden)

        # 出力層の定義
        self.l2 = nn.Linear(n_hidden, n_output)

        # ReLU関数の定義
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x1 = self.l1(x)
        x2 = self.relu(x1)
        x3 = self.l2(x2)
        return x3
            

2層目(出力層)の線形関数の出力に対して活性化関数が無いのは、この後の損失関数側にsoftmax関数も含める予定だからである。

また、これまでと異なり、パラメータの初期値を全て1.0にする設定をなくしているのは、全て1.0にすると、モデルのパラメータ数が膨大になった影響でうまく学習できないからである。


# 乱数の固定化
torch.manual_seed(123)
torch.cuda.manual_seed(123)

# モデルインスタンス生成
net = Net(n_input, n_output, n_hidden)

# モデルをGPU側に送る
net = net.to(device)
            

最適化アルゴリズムと損失関数を定義。


# 学習率
lr = 0.01

# アルゴリズム: 勾配降下法
optimizer = torch.optim.SGD(net.parameters(), lr=lr)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()
            

モデルの確認。


# モデル内のパラメータの確認
# l1.weight, l1.bias, l2.weight, l2.biasがあることがわかる

for parameter in net.named_parameters():
    print(parameter)
            

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

summary(net, (784,))
            

勾配降下法

予測計算

まず、訓練用のデータローダーであるtrain_setから1セット分のデータを取得する(本来のループ処理を1回分だけ行うためのダミーコード)。


# 訓練データセット 最初の1項目を取得
# データローダーから最初の1セットを取得する
for images, labels in train_loader:
    break
            

データローダーから取得した学習データは当初、CPU側にあるので、GPU側に送付する。


# データローダーから取得したデータをGPUに送る
inputs = images.to(device)
labels = labels.to(device)
            

net関数に入力変数inputsを渡して予測値を計算する。


# 予測計算
outputs = net(inputs)

# 結果確認
print(outputs)
            
損失計算

損失を計算すると同時に、損失を対象とした計算グラフを可視化する。


#  損失計算
loss = criterion(outputs, labels)

# 損失値の取得
print(loss.item())

# 損失の計算グラフ可視化
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            

線形関数をモデルに2つ取り入れた(隠れ層のあるモデルを作った)ため、biasとweightが2つずつ計算グラフに現れる。

勾配計算とパラメータ修正は以下の通り。


# 勾配計算の実行
loss.backward()
            

# 勾配計算の結果
w = net.to('cpu')
print(w.l1.weight.grad.numpy())
print(w.l1.bias.grad.numpy())
print(w.l2.weight.grad.numpy())
print(w.l2.bias.grad.numpy())
            

# 勾配降下法の適用
optimizer.step()
            

# パラメータ値の表示
print(net.l1.weight)
print(net.l1.bias)
            
繰り返し処理

まず、初期化処理は以下の通り。


# 乱数の固定化
torch.manual_seed(123)
torch.cuda.manual_seed(123)
torch.backends.cudnn.deterministic = True
torch.use_deterministic_algorithms = True

# 学習率
lr = 0.01

# モデルインスタンス生成
net = Net(n_input, n_output, n_hidden).to(device)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 100

# 評価結果記録用
history = np.zeros((0,5))
            

続いて、ループ部分は以下の通り。


# tqdmライブラリのインポート
from tqdm.notebook import tqdm

# 繰り返し計算メインループ

for epoch in range(num_epochs):
    # 1エポックあたりの正解数(精度計算用)
    n_train_acc, n_val_acc = 0, 0
    # 1エポックあたりの累積損失(平均化前)
    train_loss, val_loss = 0, 0
    # 1エポックあたりのデータ累積件数
    n_train, n_test = 0, 0

    # 訓練フェーズ
    for inputs, labels in tqdm(train_loader):
        # 1バッチあたりのデータ件数
        train_batch_size = len(labels)
        # 1エポックあたりのデータ累積件数
        n_train += train_batch_size

        # GPUヘ転送
        inputs = inputs.to(device)
        labels = labels.to(device)

        #勾配の初期化
        optimizer.zero_grad()

        # 予測計算
        outputs = net(inputs)

        # 損失計算
        loss = criterion(outputs, labels)

        # 勾配計算
        loss.backward()

        # パラメータ修正
        optimizer.step()

        # 予測ラベル導出
        predicted = torch.max(outputs, 1)[1]

        # 平均前の損失と正解数の計算
        # lossは平均計算が行われているので平均前の損失に戻して加算
        train_loss += loss.item() * train_batch_size
        n_train_acc += (predicted == labels).sum().item()

    #予測フェーズ
    for inputs_test, labels_test in test_loader:
        # 1バッチあたりのデータ件数
        test_batch_size = len(labels_test)
        # 1エポックあたりのデータ累積件数
        n_test += test_batch_size

        inputs_test = inputs_test.to(device)
        labels_test = labels_test.to(device)

        # 予測計算
        outputs_test = net(inputs_test)

        # 損失計算
        loss_test = criterion(outputs_test, labels_test)

        #予測ラベル導出
        predicted_test = torch.max(outputs_test, 1)[1]

        #  平均前の損失と正解数の計算
        # lossは平均計算が行われているので平均前の損失に戻して加算
        val_loss +=  loss_test.item() * test_batch_size
        n_val_acc +=  (predicted_test == labels_test).sum().item()

    # 精度計算
    train_acc = n_train_acc / n_train
    val_acc = n_val_acc / n_test
    # 損失計算
    ave_train_loss = train_loss / n_train
    ave_val_loss = val_loss / n_test
    # 結果表示
    print (f'Epoch [{epoch+1}/{num_epochs}], loss: {ave_train_loss:.5f} acc: {train_acc:.5f} val_loss: {ave_val_loss:.5f}, val_acc: {val_acc:.5f}')
    # 記録
    item = np.array([epoch+1 , ave_train_loss, train_acc, ave_val_loss, val_acc])
    history = np.vstack((history, item))
            

ミニバッチ処理でも繰り返し計算が入り、2重の繰り返しループになった。

その影響で、データローダーから学習データを取り出すたびにデータをGPUに送る処理が必要になっている。

また、訓練フェーズでミニバッチ処理用にループを回す際に、tqdm関数を経由するようにし、プログレスバーを表示させている。

結果確認


#損失と精度の確認

print(f'初期状態: 損失: {history[0,3]:.5f} 精度: {history[0,4]:.5f}' )
print(f'最終状態: 損失: {history[-1,3]:.5f} 精度: {history[-1,4]:.5f}' )
            

# 学習曲線の表示 (損失)

plt.rcParams['figure.figsize'] = (9,8)
plt.plot(history[:,0], history[:,1], 'b', label='訓練')
plt.plot(history[:,0], history[:,3], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.legend()
plt.show()
            

# 学習曲線の表示 (精度)

plt.rcParams['figure.figsize'] = (9,8)
plt.plot(history[:,0], history[:,2], 'b', label='訓練')
plt.plot(history[:,0], history[:,4], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('精度')
plt.title('学習曲線(精度)')
plt.legend()
plt.show()
            

イメージ表示での確認も行う。


# DataLoaderから最初の1セットを取得する
for images, labels in test_loader:
    break

# 予測結果の取得
inputs = images.to(device)
labels = labels.to(device)
outputs = net(inputs)
predicted = torch.max(outputs, 1)[1]
            

# 最初の50件でイメージを「正解値:予測値」と表示

plt.figure(figsize=(10, 8))
for i in range(50):
  ax = plt.subplot(5, 10, i + 1)

  # numpyに変換
  image = images[i]
  label = labels[i]
  pred = predicted[i]
  if (pred == label):
    c = 'k'
  else:
    c = 'b'

  # imgの範囲を[0, 1]に戻す
  image2 = (image + 1)/ 2

  # イメージ表示
  plt.imshow(image2.reshape(28, 28),cmap='gray_r')
  ax.set_title(f'{label}:{pred}', c=c)
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)
plt.show()
            

隠れ層の2層化

モデルクラスの定義

# モデルの定義
# 784入力10出力2隠れ層のニューラルネットワークモデル

class Net2(nn.Module):
    def __init__(self, n_input, n_output, n_hidden):
        super().__init__()

        # 隠れ層1の定義 (隠れ層のノード数: n_hidden)
        self.l1 = nn.Linear(n_input, n_hidden)

        # 隠れ層2の定義 (隠れ層のノード数: n_hidden)
        self.l2 = nn.Linear(n_hidden, n_hidden)

        # 出力層の定義
        self.l3 = nn.Linear(n_hidden, n_output)

        # ReLU関数の定義
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x1 = self.l1(x)
        x2 = self.relu(x1)
        x3 = self.l2(x2)
        x4 = self.relu(x3)
        x5 = self.l3(x4)
        return x5
            

# 乱数の固定化
torch.manual_seed(123)
torch.cuda.manual_seed(123)

# モデルインスタンス生成
net = Net2(n_input, n_output, n_hidden).to(device)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 最適化関数: 勾配降下法
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
            

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

summary(net, (784,))
            

# DataLoaderから最初の1セットを取得する
for images, labels in test_loader:
    break

# 予測結果の取得
inputs = images.to(device)
labels = labels.to(device)
            

# 予測計算
outputs = net(inputs)

#  損失計算
loss = criterion(outputs, labels)

# 損失の計算グラフ可視化
make_dot(loss, params=dict(net.named_parameters()))
            

# 勾配計算
loss.backward()

# 勾配計算結果の一部
w = net.to('cpu').l1.weight.grad.numpy()
print(w)

# 各要素の絶対値の平均
print(np.abs(w).mean())
            

# 乱数の固定化
torch.manual_seed(123)
torch.cuda.manual_seed(123)
torch.backends.cudnn.deterministic = True
torch.use_deterministic_algorithms = True

# モデルインスタンス生成
net = Net2(n_input, n_output, n_hidden).to(device)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 200

# 評価結果記録用
history2 = np.zeros((0,5))
            

# tqdmライブラリのインポート
from tqdm.notebook import tqdm

# 繰り返し計算メインループ

for epoch in range(num_epochs):
    # 1エポックあたりの正解数(精度計算用)
    n_train_acc, n_val_acc = 0, 0
    # 1エポックあたりの累積損失(平均化前)
    train_loss, val_loss = 0, 0
    # 1エポックあたりのデータ累積件数
    n_train, n_test = 0, 0

    # 訓練フェーズ
    for inputs, labels in tqdm(train_loader):
        # 1バッチあたりのデータ件数
        train_batch_size = len(labels)
        # 1エポックあたりのデータ累積件数
        n_train += train_batch_size

        # GPUヘ転送
        inputs = inputs.to(device)
        labels = labels.to(device)

        #勾配の初期化
        optimizer.zero_grad()

        # 予測計算
        outputs = net(inputs)

        # 損失計算
        loss = criterion(outputs, labels)

        # 勾配計算
        loss.backward()

        # パラメータ修正
        optimizer.step()

        # 予測ラベル導出
        predicted = torch.max(outputs, 1)[1]

        # 平均前の損失と正解数の計算
        # lossは平均計算が行われているので平均前の損失に戻して加算
        train_loss += loss.item() * train_batch_size
        n_train_acc += (predicted == labels).sum().item()

    #予測フェーズ
    for inputs_test, labels_test in test_loader:
        # 1バッチあたりのデータ件数
        test_batch_size = len(labels_test)
        # 1エポックあたりのデータ累積件数
        n_test += test_batch_size

        inputs_test = inputs_test.to(device)
        labels_test = labels_test.to(device)

        # 予測計算
        outputs_test = net(inputs_test)

        # 損失計算
        loss_test = criterion(outputs_test, labels_test)

        #予測ラベル導出
        predicted_test = torch.max(outputs_test, 1)[1]

        #  平均前の損失と正解数の計算
        # lossは平均計算が行われているので平均前の損失に戻して加算
        val_loss +=  loss_test.item() * test_batch_size
        n_val_acc +=  (predicted_test == labels_test).sum().item()

    # 精度計算
    train_acc = n_train_acc / n_train
    val_acc = n_val_acc / n_test
    # 損失計算
    ave_train_loss = train_loss / n_train
    ave_val_loss = val_loss / n_test
    # 結果表示
    print (f'Epoch [{epoch+1}/{num_epochs}], loss: {ave_train_loss:.5f} acc: {train_acc:.5f} val_loss: {ave_val_loss:.5f}, val_acc: {val_acc:.5f}')
    # 記録
    item = np.array([epoch+1 , ave_train_loss, train_acc, ave_val_loss, val_acc])
    history2 = np.vstack((history2, item))
            

#損失と精度の確認

print(f'初期状態: 損失: {history2[0,3]:.5f} 精度: {history2[0,4]:.5f}' )
print(f'最終状態: 損失: {history2[-1,3]:.5f} 精度: {history2[-1,4]:.5f}' )
            

# 学習曲線の表示 (損失)

plt.plot(history2[:,0], history2[:,1], 'b', label='訓練')
plt.plot(history2[:,0], history2[:,3], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('損失')
plt.title('学習曲線(損失)')
plt.legend()
plt.show()
            

# 学習曲線の表示 (精度)

plt.plot(history2[:,0], history2[:,2], 'b', label='訓練')
plt.plot(history2[:,0], history2[:,4], 'k', label='検証')
plt.xlabel('繰り返し回数')
plt.ylabel('精度')
plt.title('学習曲線(精度)')
plt.legend()
plt.show()
            

精度が改善したことから、隠れ層を2層にした効果はあったと考えられる。

CNN

問題の定義

"CIFAR-10データセット"と呼ぶデータセットを利用する。

画素数32×32のカラーイメージデータが、airplane、auotomobile、birdなど10種類のカテゴリに分けられていて、イメージを予測する分類問題の学習データとしてよく用いられる。

訓練用5万枚、検証用1万枚のデータが公開されている。

MNISTとの最大の違いは、カラー画像である点である。そのため、元データも画像1枚当たり、(3, 32, 32)という3階テンソルで表現されている。

CNNでは、「色、タテ、ヨコ」の3階テンソルの構造を保ったまま画像を処理する。

2階層目以降では「色」に該当するインデックスは色ではなくなってしまうため、この奥行にあたるインデックスのことを一般的に「チャネル」と呼ぶ。

CIFAR-10の問題としての難しさを確認するため、最初にあえて学習データを1階テンソル化し、「全結合型ニューラルネットワーク」でモデル構築を試みる。その後で、CNNにモデルを変えてみて、精度がどの程度向上するのか確かめることにする。

CIFAR-10もMNISTと同様に、いくつかのライブラリで加工済みのデータを入手可能である。

PyTorchのライブラリで学習データを入手する場合、イメージ一枚のデータ形式は、[3, 32, 32]になる。

CNNの処理概要

CNNの全体像

CNNを特徴付けているのは畳み込み処理(Convolution)とプーリング処理(Pooling)である。

畳み込み処理

まず3×3や5×5など小さな正方形領域の配列を用意する。この配列のことを畳み込み処理では「カーネル」と呼ぶ。

次に元の画像をカーネルと同じ大きさの正方形領域を切り取って、カーネルと対応する要素間で積を取り、その結果を加算した結果(積和)を出力領域の出力とする。

切り取る領域を1つずつずらして、同じように積和を取り、隣の領域の出力とする。

この処理の順番に繰り返すことにより、新しい正方形の出力パターンが出来上がる。

実際には、入力チャネルは複数あり、積和演算は全チャネルにまたがって行われるため、「カーネル」もそれに対応して入力チャネル分ある。

さらに、出力チャネルも複数あるため、カーネルは全体で4階テンソルの構造を持つことになる。

この4階テンソルのカーネル配列がニューラルネットワークの「パラメータ」に該当し、このテンソルの値が学習対象になる。

畳み込み処理は、カーネルの内容により、特定の傾きの直線が強調されるなど図形の特徴量の抽出に向いている。

また、カーネルが場所を移動しながら学習するため、位置の移動に無関係な特徴量を検出できることになる。

プーリング処理

プーリング処理として最もよく利用されるのは、Max Poolingである。

2×2などの小さな矩形領域で対象画像を区切り、その範囲での最大値を出力する。

矩形領域をずらして再度最大値を取得し、隣の領域の値とする。

これを順次繰り返して、新しい小さな矩形領域の値を求める。

畳み込み処理は、1画素ずつずらして処理することが多いのに対して、プーリング処理は重なる領域を取らないようにずらすのが普通である。

矩形領域のサイズは2×2が多いので、タテヨコともに元の画像の半分の画素数の新しい画像ができることになる。

プーリング処理では、画像を縮小するのと同じ効果が期待できる。そのため、物体の大きさによらない普遍的な特徴量を抽出するのに向いている。

PyTorchでのCNN実装

CNNの全体構成

「畳み込み処理」は、PyTorchのレイヤー関数でいうと、nn.Conv2dというレイヤー関数が担う。

「プーリング処理」を担うのは、nn.MaxPool2dというレイヤー関数である。

「畳み込み関数」は関数の内部にパラメータを持つ。これに対して「プーリング関数」は単なる演算なので、パラメータは持たない。

レイヤー関数で初めて出てくるものとして、「1階化関数(flatten)」がある。これは、チャネル、タテ、ヨコと3階の広がりを持つ、「畳み込み関数」「プーリング関数」の出力を、ヨコ1列の1階テンソルにする操作である。

分類モデルでは、最終的な出力が1階テンソルなので、このような操作が必要になる。

通常のディープラーニングでは、精度を上げるため「畳み込み関数」「プーリング関数」のセットを何度も繰り返すことが多い。

nn.Conv2dとnn.MaxPool2d

CNNモデルの定義のうち、「畳み込み関数」と「プーリング関数」に関わる変数定義を抜き出したものを以下に示す。


# CNN前半部分 レイヤー関数の定義

conv1 = nn.Conv2d(3, 32, 3)
relu = nn.ReLU(inplace=True)
conv2 = nn.Conv2d(32, 32, 3)
maxpool = nn.MaxPool2d((2,2))
            

畳み込み関数は、nn.Conv2dというレイヤー関数で実現されている。

第1引数は入力チャネル数、第2引数は出力チャネル数、そして第3チャネルはフィルターとなる小さな正方形(カーネル)の1辺の画素数を意味するカーネルサイズを指定している。

プーリング関数はnn.MaxPool2dというレイヤー関数で実現する。引数の(2, 2)は、それぞれ小さい矩形のタテとヨコの画素数を示している、

畳み込み関数であるconv1とconv2は、内部にパラメータを持つ。weightとbiasのshapeを確認する。


# conv1の確認
print(conv1)

# conv1の内部変数のshape確認
print(conv1.weight.shape)
print(conv1.bias.shape)

# conv2の内部変数のshape確認
print(conv2.weight.shape)
print(conv2.bias.shape)
            

conv1とconv2のweightは4つのインデックスを持つ4階テンソルになっている。

weightのshapeの最初の要素である「32」は出力チャネル数に対応している。

2つ目の要素の「3」は入力チャネル数である。

例えば、出力チャネル1用のカーネルは「入力チャネル0用」から「入力チャネル2用」まで3枚ある。

実際の畳み込み計算をする際には、積和演算を全部で入力チャネル数回行う。

その結果をすべて加算したものが、出力チャネル1のパターンになっている。

今は特定のチャネルである「出力チャネル1」に注目したが、全体では「出力チャネル0用」から「出力チャネル31用」まで全部で32セット、このようなパラメータが存在する。

畳み込み処理とプーリング層のシミュレーション

入力として使うダミーデータを生成する。


# ダミーで入力と同じサイズのTensorを生成
inputs = torch.randn(100, 3, 32, 32)
print(inputs.shape)
            

# CNNの前半部分をシミュレーションする

x1 = conv1(inputs)
x2 = relu(x1)
x3 = conv2(x2)
x4 = relu(x3)
x5 = maxpool(x4)
            

# それぞれのshapeの確認

print(inputs.shape)
print(x1.shape)
print(x2.shape)
print(x3.shape)
print(x4.shape)
print(x5.shape)
            

結果の読み方は、最初の数字が学習データの件数、2番目の数字がチャネル数、3番目と4番目が、タテとヨコの画素数である。

nn.Sequential

PyTorchの中で「コンテナ(入れ物)」と呼ばれているクラスの1つ。


# 関数定義
features = nn.Sequential(
    conv1,
    relu,
    conv2,
    relu,
    maxpool
)
            
nn.Flatten

この部品の目的は、「畳み込み処理」「プーリング処理」の最中は3階テンソルの形で扱われていたデータを、線形関数(nn.Linear)で扱えるよう、1階テンソルの形に変換することである。


# 関数定義
flatten = nn.Flatten()

# 動作テスト
outputs2 = flatten(outputs)

# 結果確認
print(outputs.shape)
print(outputs2.shape)
            

共通関数の利用

eval_loss(損失計算)

# 損失計算用
def eval_loss(loader, device, net, criterion):

    # データローダーから最初の1セットを取得する
    for images, labels in loader:
        break

    # デバイスの割り当て
    inputs = images.to(device)
    labels = labels.to(device)

    # 予測計算
    outputs = net(inputs)

    #  損失計算
    loss = criterion(outputs, labels)

    return loss
            

最初にデータローダーを使って、入力データと正解データを取得し、入力データとモデルインスタンスから予測値を計算、予測値と正解データを使って損失を計算するのが、処理の流れである。

fit(学習)

呼び出し時の引数は次の8つである。

戻り値はhistoryで、(繰り返し回数、訓練損失、訓練精度、検証損失、損失精度)の2次元配列をNumPy形式で返す。

このfit関数では、引数historyに今までのhistoryを渡すことで、「追加学習」の扱いで過去分も追加したhistoryを生成してくれる。


# 学習用関数
def fit(net, optimizer, criterion, num_epochs, train_loader, test_loader, device, history):

    # tqdmライブラリのインポート
    from tqdm.notebook import tqdm

    base_epochs = len(history)

    for epoch in range(base_epochs, num_epochs+base_epochs):
        # 1エポックあたりの正解数(精度計算用)
        n_train_acc, n_val_acc = 0, 0
        # 1エポックあたりの累積損失(平均化前)
        train_loss, val_loss = 0, 0
        # 1エポックあたりのデータ累積件数
        n_train, n_test = 0, 0

        #訓練フェーズ
        net.train()

        for inputs, labels in tqdm(train_loader):
            # 1バッチあたりのデータ件数
            train_batch_size = len(labels)
            # 1エポックあたりのデータ累積件数
            n_train += train_batch_size

            # GPUヘ転送
            inputs = inputs.to(device)
            labels = labels.to(device)

            # 勾配の初期化
            optimizer.zero_grad()

            # 予測計算
            outputs = net(inputs)

            # 損失計算
            loss = criterion(outputs, labels)

            # 勾配計算
            loss.backward()

            # パラメータ修正
            optimizer.step()

            # 予測ラベル導出
            predicted = torch.max(outputs, 1)[1]

            # 平均前の損失と正解数の計算
            # lossは平均計算が行われているので平均前の損失に戻して加算
            train_loss += loss.item() * train_batch_size
            n_train_acc += (predicted == labels).sum().item()

        #予測フェーズ
        net.eval()

        for inputs_test, labels_test in test_loader:
            # 1バッチあたりのデータ件数
            test_batch_size = len(labels_test)
            # 1エポックあたりのデータ累積件数
            n_test += test_batch_size

            # GPUヘ転送
            inputs_test = inputs_test.to(device)
            labels_test = labels_test.to(device)

            # 予測計算
            outputs_test = net(inputs_test)

            # 損失計算
            loss_test = criterion(outputs_test, labels_test)

            # 予測ラベル導出
            predicted_test = torch.max(outputs_test, 1)[1]

            #  平均前の損失と正解数の計算
            # lossは平均計算が行われているので平均前の損失に戻して加算
            val_loss +=  loss_test.item() * test_batch_size
            n_val_acc +=  (predicted_test == labels_test).sum().item()

        # 精度計算
        train_acc = n_train_acc / n_train
        val_acc = n_val_acc / n_test
        # 損失計算
        avg_train_loss = train_loss / n_train
        avg_val_loss = val_loss / n_test
        # 結果表示
        print (f'Epoch [{(epoch+1)}/{num_epochs+base_epochs}], loss: {avg_train_loss:.5f} acc: {train_acc:.5f} val_loss: {avg_val_loss:.5f}, val_acc: {val_acc:.5f}')
        # 記録
        item = np.array([epoch+1, avg_train_loss, train_acc, avg_val_loss, val_acc])
        history = np.vstack((history, item))
    return history
            

netというモデルインスタンスに対して、訓練フェーズ、予測フェーズそれぞれの先頭でnet.train(), net.eval()という関数を呼び出している。この2つの関数はモデルクラスを定義する際に利用している親クラスnn.Moduleで定義されている。

ドロップアウト関数(nn.Dropout)やBN関数(nn.BatchNorm2d)というレイヤー関数では、それぞれの関数に対して「今は訓練フェーズ」「今は予測フェーズ」という違いを教えてあげる必要がある。

evaluate_history(学習ログ)

学習結果の評価に関しては、

というのが、今までのパターンだったが、この典型的な評価パターンをまとめて行う関数が、evaluate_historyの役割である。

引数はhistoryだけでよい。


# 学習ログ解析

def evaluate_history(history):
    #損失と精度の確認
    print(f'初期状態: 損失: {history[0,3]:.5f} 精度: {history[0,4]:.5f}')
    print(f'最終状態: 損失: {history[-1,3]:.5f} 精度: {history[-1,4]:.5f}' )

    num_epochs = len(history)
    unit = num_epochs / 10

    # 学習曲線の表示 (損失)
    plt.figure(figsize=(9,8))
    plt.plot(history[:,0], history[:,1], 'b', label='訓練')
    plt.plot(history[:,0], history[:,3], 'k', label='検証')
    plt.xticks(np.arange(0,num_epochs+1, unit))
    plt.xlabel('繰り返し回数')
    plt.ylabel('損失')
    plt.title('学習曲線(損失)')
    plt.legend()
    plt.show()

    # 学習曲線の表示 (精度)
    plt.figure(figsize=(9,8))
    plt.plot(history[:,0], history[:,2], 'b', label='訓練')
    plt.plot(history[:,0], history[:,4], 'k', label='検証')
    plt.xticks(np.arange(0,num_epochs+1,unit))
    plt.xlabel('繰り返し回数')
    plt.ylabel('精度')
    plt.title('学習曲線(精度)')
    plt.legend()
    plt.show()
            
show_images_labels(予測結果表示)

事前学習済みモデルで正しい予測ができているかを、元データのイメージ表示とともに行う。また、モデルを作る前に、イメージとラベルだけを表示することもできる。

対象はデータローダーで取得される先頭50件のデータになる。

引数は次の4つである。


# イメージとラベル表示
def show_images_labels(loader, classes, net, device):

    # データローダーから最初の1セットを取得する
    for images, labels in loader:
        break
    # 表示数は50個とバッチサイズのうち小さい方
    n_size = min(len(images), 50)

    if net is not None:
      # デバイスの割り当て
      inputs = images.to(device)
      labels = labels.to(device)

      # 予測計算
      outputs = net(inputs)
      predicted = torch.max(outputs,1)[1]
      #images = images.to('cpu')

    # 最初のn_size個の表示
    plt.figure(figsize=(20, 15))
    for i in range(n_size):
        ax = plt.subplot(5, 10, i + 1)
        label_name = classes[labels[i]]
        # netがNoneでない場合は、予測結果もタイトルに表示する
        if net is not None:
          predicted_name = classes[predicted[i]]
          # 正解かどうかで色分けをする
          if label_name == predicted_name:
            c = 'k'
          else:
            c = 'b'
          ax.set_title(label_name + ':' + predicted_name, c=c, fontsize=20)
        # netがNoneの場合は、正解ラベルのみ表示
        else:
          ax.set_title(label_name, fontsize=20)
        # TensorをNumPyに変換
        image_np = images[i].numpy().copy()
        # 軸の順番変更 (channel, row, column) -> (row, column, channel)
        img = np.transpose(image_np, (1, 2, 0))
        # 値の範囲を[-1, 1] -> [0, 1]に戻す
        img = (img + 1)/2
        # 結果表示
        plt.imshow(img)
        ax.set_axis_off()
    plt.show()
            
torch_seed(乱数初期化)

# PyTorch乱数固定用

def torch_seed(seed=123):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.use_deterministic_algorithms = True
            

PyTorchでは、GPUを使って計算した場合、乱数の種を指定しただけでは結果が同じにならないことがある。その事象を避けるためにプロパティを設定する最後の2行を追加している。

データ準備

Transforms定義

データ前処理に関しては、その後で2通りのモデルを作ることを想定して1階テンソル版、3階テンソル版の2バージョンを用意する。


# Transformsの定義

# transformer1 1階テンソル化

transform1 = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5),
    transforms.Lambda(lambda x: x.view(-1)),
])

# transformer2 正規化のみ実施

# 検証データ用 : 正規化のみ実施
transform2 = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5),
])
            

transform1が全結合型ニューラルネットワーク用、transform2がCNN用になる。

データセット定義

# データ取得用関数 Datasets

data_root = './data'

# 訓練データセット 1階テンソル版
train_set1 = datasets.CIFAR10(
    root = data_root, train = True,
    download = True, transform = transform1)

# 検証データセット 1階テンソル版
test_set1 = datasets.CIFAR10(
    root = data_root, train = False,
    download = True, transform = transform1)

# 訓練データセット 3階テンソル版
train_set2 = datasets.CIFAR10(
    root =  data_root, train = True,
    download = True, transform = transform2)

# 検証データセット 3階テンソル版
test_set2 = datasets.CIFAR10(
    root = data_root, train = False,
    download = True, transform = transform2)
            

データセットのshapeを確認する。


image1, label1 = train_set1[0]
image2, label2 = train_set2[0]

print(image1.shape)
print(image2.shape)
            
データローダー定義

# データローダーの定義

# ミニバッチのサイズ指定
batch_size = 100

# 訓練用データローダー
# 訓練用なので、シャッフルをかける
train_loader1 = DataLoader(train_set1, batch_size=batch_size, shuffle=True)

# 検証用データローダー
# 検証時にシャッフルは不要
test_loader1 = DataLoader(test_set1,  batch_size=batch_size, shuffle=False)

# 訓練用データローダー
# 訓練用なので、シャッフルをかける
train_loader2 = DataLoader(train_set2, batch_size=batch_size, shuffle=True)

# 検証用データローダー
# 検証時にシャッフルは不要
test_loader2 = DataLoader(test_set2,  batch_size=batch_size, shuffle=False)
            

意図したデータができているか、確認する。


# train_loader1から1セット取得
for images1, labels1 in train_loader1:
    break

# train_loader2から1セット取得
for images2, labels2 in train_loader2:
    break

# それぞれのshape確認
print(images1.shape)
print(images2.shape)
            
検証データのイメージ表示

検証データをtest_loader2から読み取り、先頭50個のイメージを表示してみる。


# 正解ラベル定義
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# 検証データ最初の50個の表示
show_images_labels(test_loader2, classes, None, None)
            

モデル定義(全結合版)

学習用パラメータ設定

# 入力次元数 今回は3*32*32=3072
n_input = image1.view(-1).shape[0]

# 出力次元数
# 分類先クラス数 今回は10になる
n_output = len(classes)

# 隠れ層のノード数
n_hidden = 128

# 結果確認
print(f'n_input: {n_input}  n_hidden: {n_hidden} n_output: {n_output}')
            

# モデルの定義
# 3072入力10出力1隠れ層のニューラルネットワークモデル

class Net(nn.Module):
    def __init__(self, n_input, n_output, n_hidden):
        super().__init__()

        # 隠れ層の定義 (隠れ層のノード数: n_hidden)
        self.l1 = nn.Linear(n_input, n_hidden)

        # 出力層の定義
        self.l2 = nn.Linear(n_hidden, n_output)

        # ReLU関数の定義
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x1 = self.l1(x)
        x2 = self.relu(x1)
        x3 = self.l2(x2)
        return x3
            
モデルインスタンス生成とGPUの割り当て

# モデルインスタンス生成
net = Net(n_input, n_output, n_hidden).to(device)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 学習率
lr = 0.01

# 最適化関数: 勾配降下法
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
            

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

summary(net, (100,3072),depth=1)
            

# 損失計算
loss = eval_loss(test_loader1, device, net, criterion)

# 損失の計算グラフ可視化
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            

結果(全結合版)

学習

# 乱数初期化
torch_seed()

# モデルインスタンス生成
net = Net(n_input, n_output, n_hidden).to(device)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 学習率
lr = 0.01

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 50

# 評価結果記録用
history = np.zeros((0,5))

# 学習
history = fit(net, optimizer, criterion, num_epochs, train_loader1, test_loader1, device, history)
            
評価

# 評価

evaluate_history(history)
            

モデル定義(CNN版)


class CNN(nn.Module):
  def __init__(self, n_output, n_hidden):
    super().__init__()
    self.conv1 = nn.Conv2d(3, 32, 3)
    self.conv2 = nn.Conv2d(32, 32, 3)
    self.relu = nn.ReLU(inplace=True)
    self.maxpool = nn.MaxPool2d((2,2))
    self.flatten = nn.Flatten()
    self.l1 = nn.Linear(6272, n_hidden)
    self.l2 = nn.Linear(n_hidden, n_output)

    self.features = nn.Sequential(
        self.conv1,
        self.relu,
        self.conv2,
        self.relu,
        self.maxpool)

    self.classifier = nn.Sequential(
       self.l1,
       self.relu,
       self.l2)

  def forward(self, x):
    x1 = self.features(x)
    x2 = self.flatten(x1)
    x3 = self.classifier(x2)
    return x3
            
モデルインスタンスの生成

# モデルインスタンス生成
net = CNN(n_output, n_hidden).to(device)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 学習率
lr = 0.01

# 最適化関数: 勾配降下法
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
            
モデルの概要表示

# モデルの概要表示

print(net)
            

# モデルのサマリー表示

summary(net,(100,3,32,32),depth=1)
            

# 損失計算
loss = eval_loss(test_loader2, device, net, criterion)

# 損失の計算グラフ可視化
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            

結果(CNN版)

モデル初期化と学習

# 乱数初期化
torch_seed()

# モデルインスタンス生成
net = CNN(n_output, n_hidden).to(device)

# 損失関数: 交差エントロピー関数
criterion = nn.CrossEntropyLoss()

# 学習率
lr = 0.01

# 最適化関数: 勾配降下法
optimizer = optim.SGD(net.parameters(), lr=lr)

# 繰り返し回数
num_epochs = 50

# 評価結果記録用
history2 = np.zeros((0,5))

# 学習
history2 = fit(net, optimizer, criterion, num_epochs, train_loader2, test_loader2, device, history2)
            

モデルインスタンス生成時のパラメータからn_inputがなくなっている。

これは、全結合版では、先頭の線形関数の重み行列は入力データの件数に合わせてあらかじめ用意しておく必要があり、それがパラメータになっていたが、CNNでは固定長のカーネル行列を用意しておくだけなので、どんな大きさの入力でも受け取れるからである。

評価

# 評価

evaluate_history(history2)
            

# 最初の50個の表示

show_images_labels(test_loader2, classes, net, device)
            

Tuning

画像を対象とした分類型ディープラーニングモデルのチューニングの手法は大きく

  1. ニューラルネットワークの多層化
  2. 最適化関数の改善
  3. 過学習対策

の3つに分類できる。

ニューラルネットワークの多層化

ディープラーニングでは、ネットワークの階層を深くすればするだけ、汎化能力が高く、より精度の高いモデルができる可能性がある。

最適化関数

最適化関数とは、損失の勾配値をもとに、どのようなアルゴリズムでパラメータを修正するかという方式を指している。

SGD(Stochastic Gradient Descent)

勾配に一定の学習率を掛けてパラメータを修正していくという、基本的な勾配降下法に基づく方式。

クラスを使わずに実装する場合の更新式は以下の通り。


# 学習率の定義
lr = 0.001

# 勾配をもとにパラメータの更新
W -= lr * W.grad
B -= lr * B.grad 
            
Momentum

SGDを改善したアルゴリズムとしてMomentumがある。SGDが直近の勾配値しかパラメータ更新に利用しないのに対して、Momentumは過去の勾配値も記憶して、その分もパラメータを一定比率で減らしつつパラメータ更新に利用する。

PyTorchでMomentumを利用するには、次のコードのように、クラスとしてはoptim.SGDを利用しつつmomentumのパラメータ値だけ指定する。


optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9)
            
Adam

AdamはMomentumをはじめ、多数のアルゴリズムのいいところをすべて取り入れて実装したようなイメージ。


optimizer = optim.Adam(net.parameters())
            

過学習とその対応

最もよく利用される対策法は、ドロップアウト、Batch Normalization、Data Augmentationの3つである。

ドロップアウト

ドロップアウトを用いた学習は次のような形で進められる。

  1. ニューラルネットワークを定義する際に、2つのレイヤー関数の間にドロップアウト関数を部品として追加する。ドロップアウト関数のインスタンス生成時、「ドロップアウト比率」をパラメータとして設定しておく。
  2. 学習のたびに、設定したドロップアウト比率の分だけ、ドロップアウト関数の入力となる中間テンソルからランダムにドロップアウトの対象を選び、その要素を出力しなくなる。ドロップアウトに該当した一部のテンソル要素が無い状態で学習することになる。
  3. 次の学習時には、新しい乱数により別のテンソル要素がドロップアウトの対象として選ばれる。その後の学習に関しては同様である。
  4. 学習が完了し、予測フェーズになったときは、ドロップアウトの状態をなくしてすべてのテンソル要素が参加した形で予測する。

学習の回ごとに学習に参加する入力要素を入れ替えることにより、まんべんなく重み情報がいきわたり、結果的に過学習対策になると考えられている。

PyTorchでは、ドロップアウト用のレイヤー関数nn.Dropoutが用意されていて、対象のレイヤー関数間に配置することで、ドロップアウト学習ができる。


# ドロップアウトテスト用ダミーデータの作成

torch.manual_seed(123)
inputs = torch.randn(1, 10)
print(inputs)
            

# dropout関数の定義
dropout = nn.Dropout(0.5)

# 訓練フェーズでの挙動
dropout.train()
print(dropout.training)
outputs = dropout(inputs)
print(outputs)

# 予測フェーズでの挙動
dropout.eval()
print(dropout.training)
outputs = dropout(inputs)
print(outputs)
            

0でない要素では、入力値と異なる値が出力される。この挙動はオンラインドキュメントに説明されていて、「ドロップアウトの比率をpとしたとき、出力は1/(1-p)倍した値が戻される」となっている。入力値全体の平均がドロップアウト前と変わらないようにするための工夫と考えられる。

予測フェーズにおける出力は、入力と全く同じになっている。

Batch Normalization

ミニバッチ学習法をしているときに、ミニバッチの単位で、前のレイヤー関数の出力に対して正規化の処理をした後、次のレイヤー関数の入力にすると、学習効率が上がると共に過学習対策になることが発見された。このアルゴリズムをBatch Normalizationと呼ぶ。

PyTochでは、Batch Normalizationはドロップアウトと同様にレイヤー関数として部品が用意されていて、これを他のレイヤー関数間に配置することで簡単に使える。

Batch Normalization用のレイヤー関数を利用するのに必要な知識は次の通りである。

Data Augmentation

学習前の入力データに加工を施すことで、結果的に学習データのバリエーションを増やす手法である。

モデルから見ると、繰り返しのたびに異なるパターンのデータがやってくることになるので、過学習が起きにくくなる。


# 訓練データ用: 正規化に追加で反転とRandomErasingを実施
transform_train = transforms.Compose([
  transforms.RandomHorizontalFlip(p=0.5),
  transforms.ToTensor(),
  transforms.Normalize(0.5, 0.5),
  transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0, inplace=False)
])
            

ToTensorとNormalizeに追加して、RandomHolizontalFlip(ランダムに左右を反転させる)と、RandomErasing(ランダムに矩形領域を削除する)の機能を使っている。

PyTorchの場合、このようにTransformsを差し替えるだけで、あとは全く同じコードを使って、Data Augmentaion後のデータを訓練データとして利用した学習ができる。

共通関数のライブラリ化


# 共通関数のダウンロード
!git clone https://github.com/makaishi2/pythonlibs.git

# 共通関数のロード
from pythonlibs.torch_lib1 import *

# 共通関数の存在チェック
print(README)
            

このコードの先頭では、OSコマンドの1つであるgit cloneコマンドを呼び出している。

階層を深くしたモデルの実装

畳み込み関数が6層、プーリング関数が3層のモデルを作る。

クラス定義

class CNN_v2(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=(1,1))
        self.conv2 = nn.Conv2d(32, 32, 3, padding=(1,1))
        self.conv3 = nn.Conv2d(32, 64, 3, padding=(1,1))
        self.conv4 = nn.Conv2d(64, 64, 3, padding=(1,1))
        self.conv5 = nn.Conv2d(64, 128, 3, padding=(1,1))
        self.conv6 = nn.Conv2d(128, 128, 3, padding=(1,1))
        self.relu = nn.ReLU(inplace=True)
        self.flatten = nn.Flatten()
        self.maxpool = nn.MaxPool2d((2,2))
        self.l1 = nn.Linear(4*4*128, 128)
        self.l2 = nn.Linear(128, num_classes)

        self.features = nn.Sequential(
            self.conv1,
            self.relu,
            self.conv2,
            self.relu,
            self.maxpool,
            self.conv3,
            self.relu,
            self.conv4,
            self.relu,
            self.maxpool,
            self.conv5,
            self.relu,
            self.conv6,
            self.relu,
            self.maxpool,
            )

        self.classifier = nn.Sequential(
            self.l1,
            self.relu,
            self.l2
        )

    def forward(self, x):
        x1 = self.features(x)
        x2 = self.flatten(x1)
        x3 = self.classifier(x2)
        return x3
            

nn.Conv2d(3, 32, 3, padding=(1,1))のように、paddingオプションを指定している。

paddingオプションなしにカーネルサイズ3×3の畳み込み関数を適用すると、出力テンソルは入力テンソルと比較して2要素分小さくなる。

paddingとは「入力テンソルの外側をダミーデータで埋める」ということを意味していて、(1,1)の大きさでpaddingすることで、結果的に入力テンソルと同じ要素数の出力テンソルが得られる。

計算グラフを可視化する。


# 損失関数のグラフ表示
net = CNN_v2(n_output).to(device)
criterion = nn.CrossEntropyLoss()
loss = eval_loss(test_loader, device, net, criterion)
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            
インスタンス生成

新しいクラス(CNN_v2)からのインスタンスの生成は以下の通り。


# 乱数の固定化
torch_seed()

# モデルインスタンス生成
lr = 0.01
net = CNN_v2(n_output).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr)
history = np.zeros((0, 5))
            
学習

# 学習

num_epochs = 50
history = fit(net, optimizer, criterion, num_epochs, train_loader, test_loader, device, history)
            

層の深いモデルに対しては、最適化関数もなんらかの工夫が必要である。

結果を可視化してみる。


evaluate_history(history)
            

最適化関数の選択

最適化関数だけMomentumに差し替える。


# 乱数の固定化
torch_seed()

# モデルインスタンス生成
lr = 0.01
net = CNN_v2(n_output).to(device)
criterion = nn.CrossEntropyLoss()
# 最適化関数にmomentumを指定
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9)
history2 = np.zeros((0, 5))
            

# 学習

num_epochs = 20
history2 = fit(net, optimizer, criterion, num_epochs, train_loader, test_loader, device, history2)
            

evaluate_history(history2)
            

Adamの場合は以下の通り。


# 乱数の固定化
torch_seed()

# モデルインスタンス生成
net = CNN_v2(n_output).to(device)
criterion = nn.CrossEntropyLoss()
# 最適化関数にAdamを指定
optimizer = optim.Adam(net.parameters())
history3 = np.zeros((0, 5))
            

print(optimizer)
            

# 学習

num_epochs = 20
history3 = fit(net, optimizer, criterion, num_epochs, train_loader, test_loader, device, history3)
            

evaluate_history(history3)
            

結果を比較してみる。


# 結果の比較(検証データへの精度)
plt.figure(figsize=(9,8))
plt.plot(history[:,0], history[:,4], label='SGD', c='k',ls='dashed' )
plt.plot(history2[:,0], history2[:,4], label='SGD momentum=0.9', c='k')
plt.plot(history3[:,0], history3[:,4], label='Adam', c='b')
plt.title('最適化関数 比較結果(検証データへの精度)')
plt.xlabel('繰り返し回数')
plt.ylabel('精度')
plt.legend()
plt.show()
            

Adamが最も良いことが分かる。

ドロップアウト


# 予測クラスの定義

class CNN_v3(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=(1,1))
        self.conv2 = nn.Conv2d(32, 32, 3, padding=(1,1))
        self.conv3 = nn.Conv2d(32, 64, 3, padding=(1,1))
        self.conv4 = nn.Conv2d(64, 64, 3, padding=(1,1))
        self.conv5 = nn.Conv2d(64, 128, 3, padding=(1,1))
        self.conv6 = nn.Conv2d(128, 128, 3, padding=(1,1))
        self.relu = nn.ReLU(inplace=True)
        self.flatten = nn.Flatten()
        self.maxpool = nn.MaxPool2d((2,2))
        self.l1 = nn.Linear(4*4*128, 128)
        self.l2 = nn.Linear(128, num_classes)
        self.dropout1 = nn.Dropout(0.2)
        self.dropout2 = nn.Dropout(0.3)
        self.dropout3 = nn.Dropout(0.4)

        self.features = nn.Sequential(
            self.conv1,
            self.relu,
            self.conv2,
            self.relu,
            self.maxpool,
            self.dropout1,
            self.conv3,
            self.relu,
            self.conv4,
            self.relu,
            self.maxpool,
            self.dropout2,
            self.conv5,
            self.relu,
            self.conv6,
            self.relu,
            self.maxpool,
            self.dropout3,
            )

        self.classifier = nn.Sequential(
            self.l1,
            self.relu,
            self.dropout3,
            self.l2
        )

    def forward(self, x):
        x1 = self.features(x)
        x2 = self.flatten(x1)
        x3 = self.classifier(x2)
        return x3
            

# 損失関数のグラフ表示
net = CNN_v3(n_output).to(device)
criterion = nn.CrossEntropyLoss()
loss = eval_loss(test_loader, device, net, criterion)
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            

# 乱数の固定化
torch_seed()

# モデルインスタンス生成
net = CNN_v3(n_output).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters())
history = np.zeros((0, 5))
            

# 学習

num_epochs = 50
history = fit(net, optimizer, criterion, num_epochs, train_loader, test_loader, device, history)
            

evaluate_history(history)
            

ドロップアウトを導入することで過学習に対しては強くなったが、学習にかかる時間がより長くなっていることが分かる。

Batch Normalization


class CNN_v4(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=(1,1))
        self.conv2 = nn.Conv2d(32, 32, 3, padding=(1,1))
        self.conv3 = nn.Conv2d(32, 64, 3, padding=(1,1))
        self.conv4 = nn.Conv2d(64, 64, 3, padding=(1,1))
        self.conv5 = nn.Conv2d(64, 128, 3, padding=(1,1))
        self.conv6 = nn.Conv2d(128, 128, 3, padding=(1,1))
        self.relu = nn.ReLU(inplace=True)
        self.flatten = nn.Flatten()
        self.maxpool = nn.MaxPool2d((2,2))
        self.l1 = nn.Linear(4*4*128, 128)
        self.l2 = nn.Linear(128, num_classes)
        self.dropout1 = nn.Dropout(0.2)
        self.dropout2 = nn.Dropout(0.3)
        self.dropout3 = nn.Dropout(0.4)
        self.bn1 = nn.BatchNorm2d(32)
        self.bn2 = nn.BatchNorm2d(32)
        self.bn3 = nn.BatchNorm2d(64)
        self.bn4 = nn.BatchNorm2d(64)
        self.bn5 = nn.BatchNorm2d(128)
        self.bn6 = nn.BatchNorm2d(128)

        self.features = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.conv2,
            self.bn2,
            self.relu,
            self.maxpool,
            self.dropout1,
            self.conv3,
            self.bn3,
            self.relu,
            self.conv4,
            self.bn4,
            self.relu,
            self.maxpool,
            self.dropout2,
            self.conv5,
            self.bn5,
            self.relu,
            self.conv6,
            self.bn6,
            self.relu,
            self.maxpool,
            self.dropout3,
            )

        self.classifier = nn.Sequential(
            self.l1,
            self.relu,
            self.dropout3,
            self.l2
        )

    def forward(self, x):
        x1 = self.features(x)
        x2 = self.flatten(x1)
        x3 = self.classifier(x2)
        return x3
            

PyTorchで畳込み処理中にBatch Normalization関数を入れる場合、nn.BatchNorm2dというクラスを利用する。

インスタンス生成時のパラメータは、画像のチャネルがいくつあるかを示すものである。

BN関数利用時に注意しないといけないのは、この部品は単なる関数ではなく、自分自身もパラメータを持っていて、学習対象の一部になっている点である。

そのため、インスタンスを定義する際に、チャネル数が同じだからといって使いまわして良いわけはなく、使う場所ごとに別のインスタンスを定義しなければならない。


# 乱数の固定化
torch_seed()

# モデルインスタンス生成
net = CNN_v4(n_output).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters())
history = np.zeros((0, 5))
            

# 学習

num_epochs = 50
history = fit(net, optimizer, criterion, num_epochs, train_loader, test_loader, device, history)
            

evaluate_history(history)
            

Data Augmentation

PyTorchでData Augmentationする場合、Transformsにその機能を追加すればよい。


# 訓練データ用: 正規化に追加で反転とRandomErasingを実施
transform_train = transforms.Compose([
  transforms.RandomHorizontalFlip(p=0.5),
  transforms.ToTensor(),
  transforms.Normalize(0.5, 0.5),
  transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0, inplace=False)
])
            

RandomHorizontalFlipはランダムに左右反転する処理、RandomErasingはランダムに部分領域を削除する処理である。

実際の利用時には、上で定義した新しいTransformsを受けて、新しいデータセットと新しいデータローダーを順に定義していく。


# transfrom_trainを利用したデータセットの定義
train_set2 = datasets.CIFAR10(
    root = data_root, train = True,
    download = True, transform = transform_train)

# traisform_trainを利用したデータローダーの定義
batch_size = 100
train_loader2 = DataLoader(train_set2, batch_size=batch_size, shuffle=True)
            

今定義した新しいデータローダーで本当に望む加工がされているか実際に確認してみる。


# 新しい訓練用データの先頭50個を表示してみる

# 乱数初期化
torch_seed()

show_images_labels(train_loader2, classes, None, None)
            

この新しい訓練データを使って学習を行なう。


# 乱数の固定化
torch_seed()

# モデルインスタンス生成
net = CNN_v4(n_output).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters())
history = np.zeros((0, 5))
            

# 学習
# 同じモデルでtrain_loader2に変更

num_epochs = 100
history = fit(net, optimizer, criterion, num_epochs,
        train_loader2, test_loader, device, history)
            

ドロップアウト関数のときと同じで、Data Augmentationも過学習対策として有効な反面、学習回数がより多く必要になる。


evaluate_history(history)
            

モデル自体に手を加えず、既存の訓練データを加工することで、モデルの精度が上がることを確認できる。

検証データの先頭50個で、どの程度正しく認識できているか確認する。


show_images_labels(test_loader, classes, net, device)
            

正解:car、予測:truckと間違えた38番目のデータの確率値を確認してみる。


# 間違えた38番目のデータを抽出
for images, labels in test_loader:
    break
image = images[37]
label = labels[37]

# イメージを表示して確認
plt.figure(figsize=(3,3))
w = image.numpy().copy()
w2 = np.transpose(w, (1, 2, 0))
w3 = (w2 + 1)/2
plt.title(classes[label])
plt.imshow(w3)
plt.show()
            

# 予測値を取得
image = image.view(1, 3, 32, 32)
image = image.to(device)
output = net(image)

# ラベル別の確率値を表示
probs = torch.softmax(output, dim=1)
probs_np = probs.data.to('cpu').numpy()[0]
values = np.frompyfunc(lambda x: f'{x:.04f}', 1, 1)(probs_np)
names = np.array(classes)
tbl = np.array([names, values]).T
print(tbl)
            

Pretrained Model

PyTorchでは、画像分類に関して数多くの事前学習済みモデルが用意されていて、関数呼び出しだけで簡単に利用できるようになっている。

事前学習済みモデルの利用方法として有名なのは「転移学習」だが、ここでは、「ファインチューニング」と呼ばれる手法を用いる。

事前学習済みモデル

2012年にILSVRCという画像認識のコンテストで、当時としては画期的な精度を出してブレークスルーを起こしたAlexNetや、2014年に同じコンテストで上位にはいったVGG、あるいは2015年に公開されたResNetなど、多くの有名なモデルが登録されている。

これらのモデルはILSVRCで課題ように公開されていた、ImageNetという1000クラスの分類問題で学習している。

これらのモデルは、次のような関数呼び出しで簡単に読み込んで利用できる。


from torchvision import models
net = models.resnet18(pretrained=True)
            

ここでは、ResNetとVGG-19-BNの2つのモデルを使ってCIFAR-10の学習を行なう。

ファインチューニングと転移学習

ファインチューニング:事前学習済みモデルのパラメータを初期値としては利用するが、すべてのレイヤー関数で学習する手法。

転移学習:事前学習済みモデルのパラメータのうち、入力に近い部分のレイヤー関数はすべて固定し、出力に近い部分のみで学習する方法。

一般的に、学習データが大量にある場合はファインチューニングが、学習データが少ない場合は転移学習が向いていると言われている。

適合的平均プーリン関数(nn.AdaptiveAvgPool2d関数)

事前学習済みモデルでは、適合的平均プーリング関数というレイヤー関数が含まれている。

この関数の目的は、画像の画素数に依らず、入力画像として受付可能なモデルを作ることである。

次のように定義する。


# nn.AdaptiveAvgPool2dの定義
p = nn.AdaptiveAvgPool2d((1,1))
print(p)

# 線形関数の定義
l1 = nn.Linear(32, 10)
print(l1)
            

普通のプーリング関数では、パラメータはカーネルサイズ(フィルター枠のサイズ)を指定する。

これに対して、nn.AdaptiveXXXPool2d関数で指定するパラメータは、変換後の画素数になる。

つまり、上のコードにあるように、p = nn.AdaptiveAvgPool2d((1,1))という呼び出し方をすると、関数pはすべてのチャネルの結果を1×1画素にするということを意味する。

どういう演算をして結果を集約化をするかはXXXの文字列で決まる。今回はAvgなので、平均値を取る処理をしていることになる。

次のコードは、モデルで畳み込み処理の結果から最後の線形関数に処理が渡る部分をシミュレーションしたものである。


# 事前学習済みモデルのシミュレーション
inputs = torch.randn(100, 32, 16, 16)
m1 = p(inputs)
m2 = m1.view(m1.shape[0],-1)
m3 = l1(m2)

# shape確認
print(m1.shape)
print(m2.shape)
print(m3.shape)
            

inputsの段階ではshape=[100, 32, 16, 16]と畳み込み処理途中の、チャネル当たりの画素数が16×16だったが、その次のnn.AdaptiveAvgPool2dにかかった段階で、shape=[100, 32, 1, 1]と、チャネルごとに1画素しか持たない形になる。

さらにその次のview関数の出力でshape=[100, 32]と、線形関数の入力となりうるshapeに変換されている。

この仕組みであれば、inputsのshapeが[100, 32, 8, 8]であっても、[100, 32, 4, 4]であっても全く同じ線形関数(Linear(32, 10))で受け取れる。

CNNのモデルを作るときに、最初の線形関数の入力次数を何らかの形で計算する必要があったが、この問題は、畳み込み処理の最終段に適合的平均プーリング関数を使うことで解決する。

このような工夫をすることで、事前学習済みモデルがどのような入力画素数の画像に対しても利用できるようになっている。

とはいえ、今回利用するResNet-18とVGG-19-BNはどちらも、学習時はタテヨコ224画素の画像で学習しているので、それに合わせて学習時は可能な限り入力データ画素数をこの値に合わせることが、精度向上の観点では望ましい。

データ準備


# 分類先クラス名の定義

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# 分類先クラス数 今回は10になる
n_output = len(classes)
            

# Transformsの定義

# 学習データ用: 正規化に追加で反転とRandomErasingを実施
transform_train = transforms.Compose([
  transforms.Resize(112),
  transforms.RandomHorizontalFlip(p=0.5),
  transforms.ToTensor(),
  transforms.Normalize(0.5, 0.5),
  transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0, inplace=False)
])

# 検証データ用 : 正規化のみ実施
transform = transforms.Compose([
  transforms.Resize(112),
  transforms.ToTensor(),
  transforms.Normalize(0.5, 0.5)
])
            

Resize(112)の呼び出しが追加されている。この機能は、元の画像を拡大・縮小して指定した画素数に変換するというものである。


# データ取得用関数 Dataset

data_root = './data'

train_set = datasets.CIFAR10(
    root = data_root, train = True,
    download = True, transform = transform_train)

# 検証データの取得
test_set = datasets.CIFAR10(
    root = data_root, train = False,
    download = True, transform = transform)
            

# バッチサイズ指定
batch_size = 50

# データローダー

# 訓練用データローダー
# 訓練用なので、シャッフルをかける
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)

# 検証用データローダー
# 検証時にシャッフルは不要
test_loader = DataLoader(test_set,  batch_size=batch_size, shuffle=False)
            

ResNet-18の読み込み


#  必要ライブラリのロード
from torchvision import models

# 事前学習済みモデルのロード
# pretraind = True で学習済みパラメータも一緒に読み込む
net = models.resnet18(pretrained = True)
            

pinrt関数でこのニューラルネットワークの構造を調べてみる。


# ネットワークの概要表示

print(net)
            

summary関数を使って、中間テンソルのshapeがどのように変化していくかを確認してみる。


# モデルのサマリー表示
net = net.to(device)
summary(net,(100,3,112,112))
            

モデルの最終段の線形関数fcをより詳しく確認してみる。


print(net.fc)
print(net.fc.in_features)
            

変数fcの実体は線形関数(nn.Linear)であり、入力(in_features)は512、出力(out_features)は1000だと分かる。

最終レイヤー関数の付け替え

読み込んだ直後の事前学習済みモデルは、1000種類の分類を学習タスクとしていたので、最終段の線形関数の出力は10次元で十分である。

学習方法がファインチューニングの場合も、転移学習の場合も、読み込んだ事前学習済みモデルの最終段を目的に合った線形関数に付け替える必要がある。このタスクを「最終レイヤー関数の付け替え」と呼ぶ。


# 乱数の初期化
torch_seed()

# 最終レイヤー関数の入力次元数を確認
fc_in_features = net.fc.in_features

# 最終レイヤー関数の付け替え
net.fc = nn.Linear(fc_in_features, n_output)
            

# 確認
print(net)
            

net = net.to(device)
summary(net,(100,3,224,224))
            

最終的に出来上がったモデルを使って損失を定義し、損失の計算グラフを可視化する。


# 損失の計算グラフ可視化

criterion = nn.CrossEntropyLoss()
loss = eval_loss(test_loader, device, net, criterion)
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            

学習と結果評価

初期設定

乱数初期化の後で、モデルの読み込み、ノードの付け替えを改めて行い、その後に損失関数や最適化関数を定義する。


# 乱数の初期化
torch_seed()

# 事前学習済みモデルのロード
# pretraind = True で学習済みパラメータも一緒に読み込む
net = models.resnet18(pretrained = True)

# 最終レイヤー関数の入力次元数を確認
fc_in_features = net.fc.in_features

# 最終レイヤー関数の付け替え
net.fc = nn.Linear(fc_in_features, n_output)

# GPUの利用
net = net.to(device)

# 学習率
lr = 0.001

# 損失関数定義
criterion = nn.CrossEntropyLoss()

# 最適化関数定義
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9)

# historyファイル初期化する
history = np.zeros((0, 5))
            

事前学習済みモデルを利用する場合、各レイヤー関数のパラメータはほぼ出来上がった状態になっている。このような条件の場合は、あまり複雑な最適化関数を利用するより、シンプルなアルゴリズムが望ましいと言われている。

そこで、今回は最適化関数にはmomentumオプションを付けた状態のoptim.SGD関数を利用している。

学習

# 学習
num_epochs = 5
history = fit(net, optimizer, criterion, num_epochs,
        train_loader, test_loader, device, history)
            
結果確認

# 結果サマリー
evaluate_history(history)
            

VGG-19-BNの利用

モデルの読み込み

# 事前学習済みモデルの読み込み
from torchvision import models
net = models.vgg19_bn(pretrained = True)
            
モデル構造の確認

# モデルの確認
print(net)
            

最終段の線形関数の詳細を確認する。


# 最終レイヤー関数の確認
print(net.classifier[6])
            
最終レイヤー関数の付け替え

# 乱数の初期化
torch_seed()

# 最終レイヤー関数の付け替え
in_features = net.classifier[6].in_features
net.classifier[6] = nn.Linear(in_features, n_output)

# features最後のMaxPool2dをはずす
net.features = net.features[:-1]

# AdaptiveAvgPool2dをはずす
net.avgpool = nn.Identity()
            

最後の2行は、本来のノード付け替え作業には不要だが、精度に関して再現性のあるモデルを作るために必要だった処理である。


# モデルのサマリー表示
net = net.to(device)
summary(net,(100,3,112,112))
            
損失の計算グラフの可視化

# 損失の計算グラフ可視化

criterion = nn.CrossEntropyLoss()
loss = eval_loss(test_loader, device, net, criterion)
g = make_dot(loss, params=dict(net.named_parameters()))
display(g)
            
学習

num_epochs = 5
history = fit(net, optimizer, criterion, num_epochs,
          train_loader, test_loader, device, history)
            
結果確認

# 結果サマリー
evaluate_history(history)
            

最後にshow_images_labels関数を使って、検証データの先頭50個のイメージ表示と予測結果を確認する。


# イメージと正解・予測結果の表示
show_images_labels(test_loader, classes, net, device)
            

再現性のある機械学習モデルの作り方


# PyTorch乱数固定用
def torch_seed(seed=123):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministi = True
    torch.use_deterministic_algorithms = True
            
機械学習と乱数の関係

機械学習・ディープラーニングでは、そのアルゴリズムの中で乱数が必要な処理がいくつかある。

CPU or GPU

次のコードのように、CPUとGPUのどちらに対しても乱数の種を明示的に指定する必要がある。


seed = 123
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
            
deterministic設定

この項目をTrueに設定すると、パフォーマンスより値の再現性重視で、GPUが同じ結果を返すようになる。


torch.backends.cudnn.deterministi = True
torch.use_deterministic_algorithms = True
            

汎用性のある事前学習済みモデルの作り方


# 乱数の初期化
torch_seed()

# 最終レイヤー関数の付け替え
in_features = net.classifier[6].in_features
net.classifier[6] = nn.Linear(in_features, n_output)

# features最後のMaxPool2dをはずす
net.features = net.features[:-1]

# AdaptiveAvgPool2dをはずす
net.avgpool = nn.Identity()
            
モデルの読み込みと構造の確認

# 事前学習済みモデルの読み込み
from torchvision import models
net = models.vgg19_bn(pretrained = True)
            

print(net)
            
中間テンソルの確認

# オリジナルデータサイズの場合
net = net.to(device)
summary(net,(100,3,224,224))
            

# 実習用データサイズの場合
summary(net,(100,3,112,112))
            
MaxPool2d-53を落とす

# featuresの最後の要素(MaxPool2d)を落とす
net.features = net.features[:-1]
print(net.features)
            
AdaptiveAvgPool2d-54を落とす

# avgpoolに入っているAdaptiveAvgPool2dを何もしない関数(nn.Identity)に置き換え
net.avgpool = nn.Identity()
            

Custom Data

問題の定義

PyTorchではImageFolderという便利な機能があり、データセット指定をこの関数呼び出しに差し替えるだけで、これまでのコードがそのまま使える。

最初は、PyTorchのチュートリアルで取り上げられている、「アリ」と「ハチ」の分類問題に取り組む。

この問題では244件の訓練データと153件の検証データが、すべてJPEG形式で分類先ごとの別のフォルダーに含まれている。

モデルとしては、VGG-19-BNを利用する。学習方法としては、最初にファインチューニングの方法を使った後、転移学習の方法も試してみる。

2つ目の問題では、「シベリアンハスキー」と「オオカミ」を分類するモデルを、VGG-19-BNを使って、転移学習により作成する。

データ準備

データのダウンロード、解凍、ツリー表示

Pythonの関数を使うのではなく、Linuxのコマンドを利用する。

treeコマンドの導入

# 必要ライブラリ・コマンドの導入

!pip install japanize_matplotlib | tail -n 1
!pip install torchviz | tail -n 1
!pip install torchinfo | tail -n 1
w = !apt install tree
print(w[-2])
            

実行結果をwという変数に代入しているのは、OSコマンドの余分な出力を見えないようにするためである。

ダウンロード

学習対象データがインターネットからダウンロード可能であることが前提である。


# サンプルデータのダウンロード
w = !wget -nc https://download.pytorch.org/tutorial/hymenoptera_data.zip

# 結果確認
print(w[-2])
            
解凍

# データ解凍
w = !unzip -o hymenoptera_data.zip

# 結果確認
print(w[-1])
            

-oのオプションを付けているのは、2度目以降の実行で上書きできるようにするためである。


# 解凍ファイルのtree表示
!tree hymenoptera_data
            

これがImageFolderを利用してデータセットを定義するための前提となる。

Transforms定義

# Transforms定義

# 検証データ用 : 正規化のみ実施
test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5)
])

# 訓練データ用: 正規化に追加で反転とRandomErasingを実施
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0, inplace=False)
])
            

検証用データのTransformsでは、Resize(256)の後でCenterCrop(224)としている。

事前学習済みモデルは入力データの解像度が224画素で学習しているので、ファインチューニングや転移学習をする場合も同じ画素に合わせることが望ましい。

そこで、画面全体を一旦256×256に拡大縮小した後、中央の224×224の部分をくり抜くようにしている。

訓練データ用では、この2つの機能の代わりに、RandomResizedCropという機能を利用している。

最終的に224×224画素のデータを作るところは同じだが、Resizeとくり抜きの位置に乱数の要素を入れることで、Data Augmentationとしての役割を果たすようにしている。

ImageFolderの利用

最初にImageFolderを利用してデータセットを定義するのに必要な変数を定義しておく。


# ツリーのベースディレクトリ
data_dir = 'hymenoptera_data'

# 訓練データディレクトリと検証データディレクトリの指定
import os
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'val')

# join関数の結果確認
print(train_dir, test_dir)

# 分類先クラスのリスト作成
classes = ['ants', 'bees']
            
データセット定義

# データセット定義

# 訓練用
train_data = datasets.ImageFolder(train_dir,
            transform=train_transform)
# 訓練データのイメージ表示用
train_data2 = datasets.ImageFolder(train_dir,
            transform=test_transform)
# 検証用
test_data = datasets.ImageFolder(test_dir,
            transform=test_transform)
            

訓練用データセットは2つ用意している。2つ目のtrain_data2は、訓練用画像をイメージとして表示するためのものである。このときだけは、Augmentationなど訓練用の加工がない状態で確認したいため、このような実装にしている。

データ件数を確認してみる。


# データ件数確認

print(f'訓練データ: {len(train_data)}件')
print(f'検証データ: {len(test_data)}件')
            

訓練データが244件、検証データが153件になっている。

次に、検証データの先頭10件と最後の10件を表示してみる。


# 検証データ
# 最初の10個と最後の10個の表示

plt.figure(figsize=(15, 4))
for i in range(10):
    ax = plt.subplot(2, 10, i + 1)
    image, label = test_data[i]
    img = (np.transpose(image.numpy(), (1, 2, 0)) + 1)/2
    plt.imshow(img)
    ax.set_title(classes[label])
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    ax = plt.subplot(2, 10, i + 11)
    image, label = test_data[-i-1]
    img = (np.transpose(image.numpy(), (1, 2, 0)) + 1)/2
    plt.imshow(img)
    ax.set_title(classes[label])
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

plt.show()
            
データローダー定義

# データローダー定義

batch_size = 10

# 訓練用
train_loader = DataLoader(train_data,
      batch_size=batch_size, shuffle=True)

# 検証用
test_loader = DataLoader(test_data,
      batch_size=batch_size, shuffle=False)

# イメージ表示用
train_loader2 = DataLoader(train_data2,
      batch_size=50, shuffle=True)
test_loader2 = DataLoader(test_data,
      batch_size=50, shuffle=True)
            

# 検証用データ(50件)
torch_seed()
show_images_labels(test_loader2, classes, None, None)
            

ファインチューニング版


# ファインチューニング版

# 学習済みモデルの読み込み
# vgg19_bnをパラメータ付きで読み込む
from torchvision import models
net = models.vgg19_bn(pretrained = True)

# 乱数初期化
torch_seed()

# 最終ノードの出力を2に変更する
in_features = net.classifier[6].in_features
net.classifier[6] = nn.Linear(in_features, 2)

# AdaptiveAvgPool2d関数の取り外し
net.avgpool = nn.Identity()

# GPUの利用
net = net.to(device)

# 学習率
lr = 0.001

# 損失関数定義
criterion = nn.CrossEntropyLoss()

# 最適化関数定義
optimizer = optim.SGD(net.parameters(),lr=lr,momentum=0.9)

# historyファイルも同時に初期化する
history = np.zeros((0, 5))
            

# 学習
num_epochs = 5
history = fit(net, optimizer, criterion, num_epochs,
          train_loader, test_loader, device, history)
            

# 結果確認
evaluate_history(history)
            

# 乱数初期化
torch_seed()

# 検証データへの結果表示
show_images_labels(test_loader2, classes, net, device)
            

転移学習版


# vgg19_bnをパラメータ付きで読み込む
from torchvision import models
net = models.vgg19_bn(pretrained = True)

# すべてのパラメータで勾配計算なしに
for param in net.parameters():
    param.requires_grad = False

# 乱数初期化
torch_seed()

# 最終ノードの出力を2に変更する
# このノードのみ勾配計算をすることになる
in_features = net.classifier[6].in_features
net.classifier[6] = nn.Linear(in_features, 2)

# AdaptiveAvgPool2d関数の取り外し
net.avgpool = nn.Identity()

# GPUの利用
net = net.to(device)

# 学習率
lr = 0.001

# 損失関数定義
criterion = nn.CrossEntropyLoss()

# 最適化関数定義
# パラメータ修正の対象を最終ノードに限定
optimizer = optim.SGD(net.classifier[6].parameters(),lr=lr,momentum=0.9)

# historyファイルも同時に初期化する
history = np.zeros((0, 5))
            

ニューラルネットワークのすべてのレイヤー関数に対して、勾配計算をしない設定をしている。その後で付け替えをしりnet.classifier[6]ではデフォルトの状態でrequires_grad=Trueとなっているので、結局このレイヤー関数に対してのみ勾配計算をする形になる。

また、今まではnet.classifier.parameters()とモデル内のすべてのパラメータを渡していたが、今回は付け替えしたnet.classifier[6].parameters()と対象を絞りこむ操作をしている。


# 学習
num_epochs = 5
history = fit(net, optimizer, criterion, num_epochs,
          train_loader, test_loader, device, history)
            

# 結果サマリー
evaluate_history(history)
            

# 乱数初期化
torch_seed()

# 検証データへの結果表示
show_images_labels(test_loader2, classes, net, device)
            

ユーザー定義データの場合


# データダウンロード
w = !wget https://github.com/makaishi2/pythonlibs/raw/main/images/dog_wolf.zip
print(w[-2])

# 解凍
!unzip dog_wolf.zip | tail -n 1

# 解凍結果のツリー表示
!tree dog_wolf
            
Transforms定義

# Transforms定義

# 検証データ用 : 正規化のみ実施
test_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5)
])

# 訓練データ用: 正規化に追加で反転とRandomErasingを実施
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(0.5, 0.5),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0, inplace=False)
])
            
データセット定義

# データセット定義

data_dir = 'dog_wolf'

import os
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')

classes = ['dog', 'wolf']

train_data = datasets.ImageFolder(train_dir,
            transform=train_transform)
train_data2 = datasets.ImageFolder(train_dir,
            transform=test_transform)
test_data = datasets.ImageFolder(test_dir,
            transform=test_transform)
            

# データ件数確認

print(f'学習データ: {len(train_data)}件')
print(f'検証データ: {len(test_data)}件')
            
データローダー定義

# データローダー定義

batch_size = 5
# 学習データ
train_loader = DataLoader(train_data,
            batch_size=batch_size, shuffle=True)
# 学習データ イメージ表示用
train_loader2 = DataLoader(train_data2,
            batch_size=40, shuffle=False)
# 検証データ
test_loader = DataLoader(test_data,
            batch_size=batch_size, shuffle=False)
# 検証データ イメージ表示用
test_loader2 = DataLoader(test_data,
            batch_size=10, shuffle=True)
            
訓練・検証データイメージ表示

# 訓練用データ(40件)
show_images_labels(train_loader2, classes, None, None)
            

# 検証用データ(10件)
torch_seed()
show_images_labels(test_loader2, classes, None, None)
            
モデル定義(転移学習)

# 学習済みモデルの読み込み
net = models.vgg19_bn(pretrained = True)

for param in net.parameters():
    param.requires_grad = False

# 乱数初期化
torch_seed()

# 最終ノードの出力を2に変更する
in_features = net.classifier[6].in_features
net.classifier[6] = nn.Linear(in_features, 2)

# AdaptiveAvgPool2d関数の取り外し
net.avgpool = nn.Identity()

# GPUの利用
net = net.to(device)

lr = 0.001
# 損失関数定義
criterion = nn.CrossEntropyLoss()

# 最適化関数定義
# パラメータ修正の対象を最終ノードに限定
optimizer = optim.SGD(net.classifier[6].parameters(),lr=lr,momentum=0.9)

# historyファイルも同時に初期化する
history = np.zeros((0, 5))
            

# 学習の実行

num_epochs = 10
history = fit(net, optimizer, criterion, num_epochs,
          train_loader, test_loader, device, history)
            

# 結果サマリー
evaluate_history(history)
            

# 予測結果表示
torch_seed()
show_images_labels(test_loader2, classes, net, device)