転移学習モデルで画像分類する keras.applications 編

はじめに

識別を行うためのプログラムは下記リンクgithubにアップロードされています。
モデルを構築する箇所以外のプログラムは記事内では詳しく紹介しません。こちらのリンクを参照にしてください。
最後の畳み込み層出力を利用する場合は記事のコードと同じになるよう"keras_models.py"を一部コメントアウトしてください。

https://github.com/naru-byte/transfer_learning/tree/master/code_KerasApplication

tensorflow(graph)で公開されているパラメータファイルを用いた転移学習→マダ

pytorch用いた転移学習→マダ

転移学習とは

転移学習とはあるタスクで学習したモデルのパラメータを別のタスクに用いるモデルの初期値として利用する手法です。

ニューラルネットのモデルにおいて入力に近い層は特徴抽出の役割を担うとされています。そこで量が少ないデータで性能のよい識別機を訓練するために、予め他のデータセットで十分に学習されたモデルの特徴抽出能力を利用します。

今回はImageNetという大量の画像とラベルをもつデータセットで学習されたVGG16モデルを用いて転移学習を行いCaltech101データセットを識別します。

keras

kerasは深層学習のモデルが構築できるライブラリの1つです。TensorflowやPytorchといった他のライブラリと比べて手軽でシンプルに書けるのがメリットです。

kerasには学習済みのモデルがそのまま利用できるapplicationsというクラスがあり、これを用いて転移学習を実装します。

kerasはtensorflowに統合されているものを使用します。

keras.applications.VGG16

有名なモデル構造を利用できるkerasのクラスのVGG16を呼び出します。

tf.keras.applications.VGG16(
   include_top=True,
   weights='imagenet',
   input_tensor=None,
   input_shape=None,
   pooling=None,
   classes=1000
)
・include_top:畳み込み層のあとにある3つの全結合層を利用するか決める。
・weights:重みの値を決める。
     None : ランダム初期化
     'imagenet' : ImageNet学習済みパラメータ
     パラメータ保存ファイルのパスも指定可能
・input_tensor:モデルの入力に利用するkerasのTensor
・input_shape:入力のサイズを決める
・pooling:最終畳み込み出力の処理を決める(include_topがFalseのとき)
     None:そのままの値
     max:global max pooling
     avg:global average pooling
・classes:出力のクラス数を決める(include_topがTrueのとき)

参考 https://docs.w3cub.com/tensorflow~python/tf/keras/applications/vgg16/

データのダウンロード

利用するデータセットのCaltech101をダウンロードします。

http://www.vision.caltech.edu/Image_Datasets/Caltech101/

より、Download項目から101_ObjectCategories.tar.gz (131Mbytes) を入手します。

以降この記事ではフォルダ構成を/Caltech101 の下に
・/code
・/data
・/log
のフォルダがある状況として話をします。
解凍した"101_ObjectCategories"は"data"に移動します。

画像1

モデル構築以外のプログラム

これから紹介する2つモデル構築の共通のコードです。

メインのプログラム

import os
import numpy as np
import tensorflow as tf
import datasets
import keras_models

base_dir = os.path.expanduser("~/Caltech101")
log_dir = os.path.join(base_dir,"log")
model_name = "Caltech_transfer"
os.makedirs(os.path.join(log_dir,model_name), exist_ok=True)

data_dir = os.path.join(*[base_dir,"data","101_ObjectCategories"])

datasize = {"train":28, "valid":2}
IMG_X_SIZE, IMG_Y_SIZE = 256, 256
N_CLASSES = 102

n_epoch = 50

trainable = [True,False][1]
batch_size, val_batch = 64,32

if __name__ == "__main__":
   with tf.device('/device:CPU:0'):
       data = datasets.caltechdata(data_dir=data_dir,
                                   datasize=datasize,
                                   IMG_X_SIZE=IMG_X_SIZE, 
                                   IMG_Y_SIZE=IMG_Y_SIZE,
                                   N_CLASSES=N_CLASSES)
       train_datas, validation_datas, test_datas, num_train, num_val, num_test = data()
       print(train_datas, validation_datas, test_datas)
   with tf.device('/device:GPU:1'):
       train_datas = train_datas.shuffle(buffer_size=1000).batch(batch_size)
       validation_datas = validation_datas.batch(val_batch)
       test_datas = test_datas.batch(val_batch)

       def change_range(image,label):
           mean = [0.485, 0.456, 0.406]
           std = [0.229, 0.224, 0.225]
           return (image/255.0-mean)/std, label
       train_datas = train_datas.map(change_range)
       validation_datas = validation_datas.map(change_range)
       test_datas = test_datas.map(change_range)

   for image_batch, label_batch in train_datas.take(1):
       pass

   print(image_batch.shape)

   network = keras_models.VGG16(IMG_X_SIZE=IMG_X_SIZE,
                                IMG_Y_SIZE=IMG_Y_SIZE,
                                N_CLASSES=N_CLASSES)
   model = network(x=image_batch,trainable=trainable)
   model.compile(optimizer=tf.keras.optimizers.Adam(),
                 loss='categorical_crossentropy',
                 metrics=['accuracy'])
   model.summary()
   print(len(model.trainable_variables))

   checkpoint_path = os.path.join(*[log_dir,model_name,"cp-{epoch:04d}.ckpt"])
   checkpoint_dir = os.path.dirname(checkpoint_path)

   cp_callback = tf.keras.callbacks.ModelCheckpoint(
       checkpoint_path,
       verbose=1,
       save_weights_only=True,
       save_freq='epoch')

   validation_steps = num_val // val_batch
   loss0,accuracy0 = model.evaluate(validation_datas, steps = validation_steps)
   print("initial loss: {:.2f}".format(loss0))
   print("initial accuracy: {:.2f}".format(accuracy0))

   history = model.fit(train_datas,
                       epochs=n_epoch,
                       validation_data=validation_datas,
                       callbacks=[cp_callback])
                       
   test_step = num_test // val_batch
   test_loss, test_accuracy = model.evaluate(test_datas, steps = test_step)

データセットを扱うためのプログラム

import os
import tensorflow as tf
from utils import crop, flip

def preprocess_image(image, IMG_X_SIZE, IMG_Y_SIZE):
   image = tf.image.decode_jpeg(image, channels=3)
   image = tf.image.resize(image, [IMG_Y_SIZE, IMG_X_SIZE])
   return image

def load_and_preprocess_image(path, IMG_X_SIZE, IMG_Y_SIZE):
   image = None
   try:
       image = tf.io.read_file(path)
       image = preprocess_image(image, IMG_X_SIZE, IMG_Y_SIZE)
   except Exception as e:
       print(path)
       print(e)
       print("error occured")
   return image

class caltechdata:
   def __init__(self, data_dir, datasize, IMG_X_SIZE, IMG_Y_SIZE, N_CLASSES):
       self.data_dir = data_dir
       self.datasize = datasize
       self.IMG_X_SIZE = IMG_X_SIZE
       self.IMG_Y_SIZE = IMG_Y_SIZE
       self.N_CLASSES = N_CLASSES

   def __call__(self):
       return self.load_datas()

   def train_load_and_preprocess_from_path_label_c(self, path, label, crop_bool, flip_bool):
       image = load_and_preprocess_image(path, self.IMG_X_SIZE, self.IMG_Y_SIZE)
       if flip_bool:
           image = flip(image)
       if crop_bool:
           image = crop(image, self.IMG_Y_SIZE, self.IMG_X_SIZE)
       return image, tf.one_hot(indices=[label], depth=self.N_CLASSES)[0]

   def validation_load_and_preprocess_from_path_label_c(self, path, label):
       image = load_and_preprocess_image(path, self.IMG_X_SIZE, self.IMG_Y_SIZE)
       return image, tf.one_hot(indices=[label], depth=self.N_CLASSES)[0]

   def load_datas(self):
       train_size, valid_size = self.datasize["train"], self.datasize["valid"]

       train_x, train_y, val_x, val_y, test_x, test_y = [], [], [], [], [], []
       crop_bool, flip_bool = [], []
       folders = sorted(os.listdir(self.data_dir))
       # print(folders)
       for cnt, folder in enumerate(folders):
           images = sorted(os.listdir(os.path.join(self.data_dir,folder)))
           for image in images[:train_size]:
               for i in [True,False]:
                   for j in [True,False]:
                       train_x.append(os.path.join(*[self.data_dir,folder,image]))
                       train_y.append(cnt)
                       crop_bool.append(i)
                       flip_bool.append(j)

           for image in images[train_size:train_size+valid_size]:
               val_x.append(os.path.join(*[self.data_dir,folder,image]))
               val_y.append(cnt)

           for image in images[train_size+valid_size:]:
               test_x.append(os.path.join(*[self.data_dir,folder,image]))
               test_y.append(cnt)

       train_ds = tf.data.Dataset.from_tensor_slices((train_x, train_y, crop_bool, flip_bool))
       train_ds = train_ds.map(self.train_load_and_preprocess_from_path_label_c)
       validation_ds = tf.data.Dataset.from_tensor_slices((val_x, val_y))
       validation_ds = validation_ds.map(self.validation_load_and_preprocess_from_path_label_c)
       test_ds = tf.data.Dataset.from_tensor_slices((test_x, test_y))
       test_ds = test_ds.map(self.validation_load_and_preprocess_from_path_label_c)
       return train_ds, validation_ds, test_ds, len(train_x), len(val_x), len(test_x)

最後の畳み込み層出力を利用してクラス識別を行う

ImageNetで学習されたVGG16の畳み込み層をすべて利用します。

こちらは比較的単純です。tensorflowのチュートリアルにも同様の内容があります。( https://www.tensorflow.org/tutorials/images/transfer_learning )

モデルの記述は以下のようになります。

class VGG16:
   def __init__(self, IMG_X_SIZE, IMG_Y_SIZE, N_CLASSES):
       self.IMG_X_SIZE = IMG_X_SIZE
       self.IMG_Y_SIZE = IMG_Y_SIZE
       self.N_CLASSES = N_CLASSES

   def __call__(self, x, trainable):
       return self.build_network(x, trainable)

   def build_network(self, x, trainable):
       base_model = tf.keras.applications.VGG16(input_shape=(self.IMG_Y_SIZE, self.IMG_X_SIZE, 3),
                                                include_top=False,
                                                weights='imagenet')
       feature_batch = base_model(x) 
       print(feature_batch.shape)
       # setting base_model parameter freeze(pre-train) or not(fine-tuning)
       base_model.trainable = trainable
       base_model.summary()
       #global average pooling
       global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
       feature_batch_average = global_average_layer(feature_batch)
       print(feature_batch_average.shape)
       #full connection layer
       fc_layer = tf.keras.layers.Dense(256, activation=tf.nn.relu)
       feature_batch_fc = fc_layer(feature_batch_average)
       print(feature_batch_fc.shape)
       #convert to classifier vector
       prediction_layer = tf.keras.layers.Dense(self.N_CLASSES, activation=tf.nn.softmax)
       prediction_batch = prediction_layer(feature_batch_fc)
       print(prediction_batch.shape)
       #stack above layers
       model = tf.keras.Sequential([
           base_model,
           global_average_layer,
           fc_layer,
           prediction_layer
       ])
       model.summary()
       return model

再利用パラメータを学習により更新するか否かはこの一行によって指定します。

base_model.trainable = trainable

更新しないものが転移学習(trainable=False)
更新するものがファインチューニング(trainable=True)
と呼ばれることもあります。

途中層の畳み込み層出力を利用してクラス識別を行う

trainableをTrueにした場合の膨大な学習パラメータなど全ての層の利用は深すぎると感じた場合、再利用モデルの一部のみを用いることが手段として考えられます。

こちらはkerasがそういった需要を直接サポートするライブラリを提供していないため、小細工が必要となります。

keras.applications.VGG16でモデルを呼び出した後、layersメソッドを用いて一部を取り出し再定義します。

以下のようなコードになります。

    (以上同文)
    def build_network(self, x, trainable):
       base_model = tf.keras.applications.VGG16(input_shape=(self.IMG_Y_SIZE, self.IMG_X_SIZE, 3),
                                                include_top=False,
                                                weights='imagenet')

       ### 畳み込み層の一部のみを利用する場合に使う箇所
       base_input_shape = base_model.layers[0].input_shape[0] # 
       print(base_input_shape) #
       base_model = tf.keras.Sequential(base_model.layers[:7]) #2block last pooling 7, 3block last 11 #
       print(type(base_model)) #
       print(base_model.layers) #
       base_model.summary() #
       input_layer = tf.keras.Input(shape=base_input_shape[1:], batch_size=base_input_shape[0]) #
       prev_layer = input_layer #
       for layer in base_model.layers: #
           prev_layer = layer(prev_layer) #
       base_model = tf.keras.models.Model([input_layer], [prev_layer]) #
       ### 箇所終わり

       feature_batch = base_model(x) 
       (以下同文)

追加文1行目

base_input_shape = base_model.layers[0].input_shape[0]

3行目でSequentialを用いてモデルの再定義を行う際に入力レイヤーが消失してしまうため入力サイズを変数に保存しておきます。
(おそらくkeras.models.Modelはinput_layerも重ねられるが、SequentialのModelはそれが行えない)

3行目

base_model = tf.keras.Sequential(base_model.layers[:7]) #2block last pooling 7, 3block last 11

VGG16の再利用したい部分までのモデルを定義します。
base_model.layers[:n] の"n"で入力から何層目まで用いるかを指定します。

"n"の具体的な数字は下のようになっています。

・1ブロック目のプーリング層:4
・2ブロック目のプーリング層:7
・3ブロック目のプーリング層:11
・4ブロック目のプーリング層:15
・5ブロック目のプーリング層:19

7行目

input_layer = tf.keras.Input(shape=base_input_shape[1:], batch_size=base_input_shape[0])

1行目の情報を基に入力レイヤーを定義します。

8~11行目

       prev_layer = input_layer #
       for layer in base_model.layers: #
           prev_layer = layer(prev_layer) #
       base_model = tf.keras.models.Model([input_layer], [prev_layer])

入力レイヤーから再利用モデルの最終層までfor文内でレイヤーを重ねていきます。

それをkeras.models.Modelのクラスで定義します。
11行目によってbase_modelはkeras.applicationsで返ってくるモデルと同じ扱いのできる状態の、利用したい一部分のみモデルとなります。

そのため以降は同じコードで動作します。

おわりに

後者の一部再利用はもっとスマートな方法をご存じの方がおりましたら、ご教授いただければ幸いです。

tensorflow(graph)とpytorchの2つで同様の内容を出します。


この記事が気に入ったらサポートをしてみませんか?