見出し画像

SageMakerで独自アルゴリズムを使う

電通デジタルで機械学習エンジニアをしている今井です。
本記事では、SageMaker独自アルゴリズムとして開発した libsvm-converter について紹介します。

Amazon SageMakerとは

Amazon SageMakerはAWSが提供する完全マネージド型の機械学習(ML)サービスです。
SageMakerにはJupyterノートブックが搭載されており、数行のPythonコードを記述するだけでMLモデルの学習、デプロイ、推論が可能になります。

簡単にですが、SageMakerの使い方を紹介します。
開発環境には conda_python3 のノートブックを使用しています。

はじめに、モデルの学習を行います。
モデルの学習には sagemaker.estimator.Estimator を使用します。

import sagemaker
estimator = sagemaker.estimator.Estimator(コンテナイメージなどの引数)

SageMakerではXGBoostやFactorization Machinesなどを組み込みアルゴリズムとして提供しており、コンテナイメージは下記のように取得できます。

from sagemaker.amazon.amazon_estimator import get_image_uri
image_name = get_image_uri('リージョン名', 'アルゴリズム名')
# ex. get_image_uri('ap-northeast-1', 'xgboost')
# → '501404015308.dkr.ecr.ap-northeast-1.amazonaws.com/xgboost:1'

estimatorを定義したら estimator.set_hyperparameters() でハイパーパラメーターを指定します。
また、sagemaker.tuner.HyperparameterTuner を使うことでハイパーパラメーターチューニングも可能です。

SageMakerではモデル学習に使うデータをS3に置いておく必要があります。
ここでは学習用のCSVデータを s3://sagemaker-example/train.csv とします。

s3_train_data = sagemaker.session.s3_input(
   s3_data='s3://sagemaker-example/train.csv',
   content_type='text/csv')

# 下記でトレーニングジョブがはじまる
estimator.fit({'train': s3_train_data})

estimator.fit()を実行するとトレーニングジョブがはじまり、無事に終わるとS3にモデルファイルmodel.tar.gzが出力されます。
S3の出力パスは estimator.model_data で取得できます。
また、モデルの出力先を指定したい場合は、Estimatorのoutput_pathにS3のパスを事前に渡しておきます。

次に、学習したモデルを推論に使うためにデプロイを行います。
SageMakerではリアルタイム推論とバッチ推論を提供しており、リアルタイム推論を実行する場合は、estimator.deploy(エンドポイント名などの引数) でエンドポイントを作成します。
一方で、バッチ推論ではエンドポイントは必要ないため

sagemaker.Session().create_model(
   'モデル名',
   sagemaker.get_execution_role(),
   estimator.create_model().prepare_container_def('インスタンスタイプ'))

としてモデルをホスティングします。
estimator.deploy()でもバッチ推論は可能ですが、本来必要ないエンドポイントの費用がかかってしまうのを避けるため、社内では上記を推奨しています。

最後に学習したモデルを使って推論を行います。
リアルタイム推論の場合は boto3.client('sagemaker-runtime').invoke_endpoint(エンドポイント名などの引数) で推論結果を取得できます。
バッチ推論の場合は sagemaker.transformer.Transformer を使用します。

transformer = sagemaker.transformer.Transformer(モデル名など引数)
transformer.transform(
   data='s3://sagemaker-example/predict.csv',
   content_type='text/csv',
   split_type='Line',
   wait=True)

上記を実行するとバッチ変換ジョブがはじまり、推論結果がpredict.csv.outとしてS3に出力されます。

このようにSageMakerを使うことで簡単にMLモデルの学習、デプロイ、推論が可能になることがイメージできたかと思います。

独自アルゴリズムを使いたい

SageMakerではコンテナイメージをECRに登録することで、独自アルゴリズムを使用することができます。
ここでは、独自アルゴリズムの公式サンプルを基に開発Tipsについて紹介します。

SageMaker用のコンテナイメージは下記のようなファイル構成になります。

opt/
├ program/
 ├ train
 ├ serve 
 ├ nginx.conf
 ├ predictor.py
 └ wsgi.py
└ ml/
 ├ input/
   └ data/
 └ model/

opt/program/train estimator.fit({'train': s3_train_data}) で実行されるプログラムです。
chmod +xなどで実行権限を与えておく必要があります。
また、estimator.fit()で渡したS3のデータは /opt/ml/input/data/train/train.csv としてコンテナ内で処理することができます。

推論に使うための学習済みモデルパラメータは、/opt/ml/model/モデルファイル名 としてファイル出力することで estimator.fit()が終わるとmodel.tar.gz としてS3に出力されます。

opt/program/serve はboto3.client('sagemaker-runtime').invoke_endpoint()やtransformer.wait()で実行されるプログラムです。
こちらも実行権限を与えておく必要があります。

モデルをデプロイしたのち、エンドポイントやバッチ変換ジョブを作成すると、opt/program/predictor.py で定義されたFlaskサーバーが起動します。
Flaskサーバーでは、/opt/ml/model/モデルファイル名から学習済みモデルパラメータを読み取り、ScoringServiceクラスのpredict関数の中に書かれた推論プログラムが実行されます。

このように、独自アルゴリズムを使うためには、コンテナ内におけるファイルの入出力パスを意識しておく必要はありますが、基本的にはtrainとpredictor.pyの2ファイルを修正するだけでよいです。
また、serveやnginx.confについてはNginxやuWSGIの設定を変更したい場合に修正を行ってください。

最後に、Dockerfileをbuildし、ECRに登録することでSageMakerから独自アルゴリズムを呼び出すことができます。

図1

libsvm-converterの紹介

libsvm-converterはCSVデータをLibSVM形式に変換するために開発した独自アルゴリズムです。
LibSVM形式はXGBoostなどで使われるデータ形式であり、例えば

目的変数,性別(離散値),地域(離散値),年齢(連続値)
0,男性,東京都,30
1,女性,神奈川県,25

のようなCSVデータは、離散値にLabelエンコーディングを使う場合は

0 0:1 1:1 2:30 
1 0:2 1:2 2:25

One-Hotエンコーディングを使う場合は

0 0:1 2:1 4:30
1 1:1 3:1 4:25 

のように変換されます。

まずは、libsvm-converterのDockerfileを作成します。

FROM python:3.7
LABEL maintainer="Yusaku Imai <imai.yu@dentsudigital.co.jp>"
SHELL ["/bin/bash", "-c"]

ADD program /opt/program/
# program/
#   ├ train
#   ├ serve
#   ├ nginx.conf
#   ├ predictor.py
#   └ wsgi.py

WORKDIR /opt/program
RUN chmod +x /opt/program/train
RUN chmod +x /opt/program/serve
ENV PATH="/opt/program:${PATH}"

# 必要なソフトウェアをインストールする
RUN apt-get -y update && \
   apt-get install -y --no-install-recommends nginx

# 必要なPythonパッケージをインストールする
RUN pip install scikit-learn numpy pandas flask gevent gunicorn

次に、独自アルゴリズムの公式サンプルのtrainとpredictor.pyを修正します。

trainでは <index>:<value> に変換するための情報を抽出します。
ここで、事前準備として、変換前のCSVデータやエンコーディングについての下記のconfigファイルを編集し、S3に置いておきます。

{
 "Target": 目的変数の変数名,
 "Numeric": [連続変数の変数名のリスト],
 "Categorical": [離散変数の変数名のリスト],
 "Encoding": "label" or "onehot"
}

configファイルをもとに離散変数に対してエンコーディングを行いますが、Labelエンコーディングには sklearn.preprocessing.LabelEncoder、One-Hotエンコーディングには pandas.get_dummies を使用しています。

trainの実装です。

#!/usr/bin/env python
import json
from collections import defaultdict
import pandas as pd
from sklearn.preprocessing import LabelEncoder

# load config
with open('/opt/ml/input/data/config/config.json') as f:
   config = json.load(f)

# load csv as pandas
df = pd.read_csv('/opt/ml/input/data/train/train.csv', dtype=str, keep_default_na=False, low_memory=False)

config['Index'] = defaultdict(dict)

# Categorical columns with Label Encoding
if config['Encoding'] == 'label':
   encoder = LabelEncoder()
   for i, col in enumerate(config['Categorical']):
       for j, val in enumerate(encoder.fit(df[col]).classes_):
           config['Index'][col][str(val)] = '{}:{}'.format(i, j)

# Categorical columns with One-Hot Encoding
elif config['Encoding'] == 'onehot':
   x_ = pd.get_dummies(df[config['Categorical']], columns=config['Categorical'], prefix_sep='==')
   for i, x in enumerate(x_.columns):
       col, val = x.split('==')
       config['Index'][col][str(val)] = '{}:1'.format(i)

# Numerical columns
for j, col in enumerate(config['Numeric'], i+1):
   config['Index'][col] = j

# output model file
with open('/opt/ml/model/libsvm-converter.json', 'w') as f:
   json.dump(config, f, indent=2, ensure_ascii=False)

以下のようにモデル学習を行うと、libsvm-converter.json が圧縮されたmodel.tar.gzがS3に出力されます。

s3_train_data = sagemaker.session.s3_input(
   s3_data='s3://sagemaker-example/train.csv',
   content_type='text/csv')
s3_config_data = sagemaker.session.s3_input(
   s3_data='s3://sagemaker-example/config.json',
   content_type='application/json')

estimator.fit({'train': s3_train_data, 'config': s3_config_data})

predictor.pyではこのlibsvm-converter.jsonをもとにデータの変換を行います。

predictor.pyの[18,43]行目を修正します。

import re

class LibSVMConverter:
   def __init__(self):
       self.config = json.load(open('/opt/ml/model/libsvm-converter.json'))

   def convert(self, input):
       is_target = self.config['Target'] in input
       cols = set(self.config['Index']) & set(input.columns)

       num_cols = [col for col in cols if isinstance(self.config['Index'][col], int)]
       cat_cols = [col for col in cols if isinstance(self.config['Index'][col], dict)]

       converted_results = list()
       for _, row in input.iterrows():
           line = list()
           # Numerical columns
           for col in num_cols:
               if row[col]:
                   line.append('{}:{}'.format(self.config['Index'][col], row[col]))
               # missing value -> 0
               else:
                   line.append('{}:0'.format(self.config['Index'][col]))

           # Categorical columns
           for col in cat_cols:
               if row[col] in self.config['Index'][col]:
                   line.append(self.config['Index'][col][str(row[col])])

           sorted_line = sorted(line, key=lambda x:int((re.search(r'[0-9]+', x)).group(0)))
           if is_target:
               converted_results.append(' '.join([row[self.config['Target']]] + sorted_line))
           else:
               converted_results.append(' '.join(sorted_line))

       return converted_results

class ScoringService(object):
   model = None

   @classmethod
   def get_model(self):
       if self.model == None:
           self.model = LibSVMConverter()
       return self.model

   @classmethod
   def predict(self, input):
       clf = self.get_model()
       return clf.convert(input)

StringIOにエラーがあったため、import StringIOを import io に変更し、StringIO.StringIOを io.StringIO に修正します。

また、pandasのオプションをtrainに合わせるために

data = pd.read_csv(io.StringIO(data), dtype=str, keep_default_na=False, low_memory=False)

と修正します。

あとはDockerfileをbuildし、ECRに登録することで、libsvm-converterが使えるようになるかと思います。

おわりに

本記事では、SageMakerで独自アルゴリズムを使うための方法と、CSVデータをLibSVM形式に変換するために開発したlibsvm-converterについて紹介しました。
社内で構築している機械学習ワークフローでは、libsvm-converterとXGBoostとがシームレスに実行できるように開発されており、データサイエンティストが分析業務に使う時間を大幅に削減できました。

最後に余談ですが、
2019年12月に機械学習モデルを自動作成するAmazon SageMaker Autopilotがリリースされました。
これによりアルゴリズム選択やデータ前処理、パラメータチューニングなどの手作業も不要になるため、AWSユーザーにおける「機械学習の民主化」がますます進みそうです。