見出し画像

SageMaker Processingで独自アルゴリズムを使う

電通デジタルで機械学習エンジニアをしている今井です。
本記事では、「SageMakerで独自アルゴリズムを使う」で紹介した libsvm-converter をSageMaker Processingで使う方法について紹介します。

Amazon SageMaker Processingとは

Amazon SageMaker Processingは2019年12月にリリースされたサービスで、機械学習のためのデータ前処理などをバッチで実行するためのインフラ環境を提供します。
AWSの公式ブログでは、組み込みScikit-Learnコンテナ SKLearnProcessor や独自コンテナ ScriptProcessor を用いたデータの前処理について紹介されています。

import sagemaker
script_processor = sagemaker.processing.ScriptProcessor(
   image_uri='独自コンテナのECR_URI',
   role=sagemaker.get_execution_role(),
   command=['python3'],
   instance_type='インスタンスタイプ',
   instance_count=1)

入出力に使うデータはS3を使用し、ProcessingInput と ProcessingOutput でコンテナとの受け渡しをします。
コンテナ内では /opt/ml/processing/ で入出力データにアクセスします。

# S3の入力データを定義
processing_input = sagemaker.processing.ProcessingInput(
   source='s3://sagemaker-example/入力データ名',
   destination='/opt/ml/processing/input')

# コンテナの出力データを定義
processing_output = sagemaker.processing.ProcessingOutput(
   source='/opt/ml/processing/output/出力データ名',
   destination='s3://sagemaker-example')

データ処理のPythonコードを s3://sagemaker-example/processing.py とすると、下記でバッチジョブが起動します。

script_processor.run(
   code='s3://sagemaker-example/processing.py',
   inputs=[processing_input],
   outputs=[processing_output])

従来のSageMakerと同様、サーバーレスな分析環境(Jupyterノートブック)を使えることや、処理開始/終了時にインスタンスが自動で起動/終了することから、インフラコストを抑えることが期待できます。

SageMaker Processingでlibsvm-converterを使う

ここでは、独自アルゴリズムである libsvm-converter をSageMaker Processingで使う方法について紹介します。

SageMakerのトレーニングジョブで作成したlibsvm-converterのモデルファイルを s3://sagemaker-example/model.tar.gz とします。
processing.pyでは、このモデルデータを使用し、CSVデータ /opt/ml/processing/input/data/*.csv をLibSVMデータ /opt/ml/processing/output/*.csv.out として処理します。

import os
import json
import re
import tarfile
import pandas as pd

class LibSVMConverter:
   def __init__(self):
       self.config = json.load(open('/opt/ml/model/libsvm-converter.json'))

   def convert(self, input):
       is_target = self.config['Target'] in input
       cols = set(self.config['Index']) & set(input.columns)

       num_cols = [col for col in cols if isinstance(self.config['Index'][col], int)]
       cat_cols = [col for col in cols if isinstance(self.config['Index'][col], dict)]

       converted_results = list()
       for _, row in input.iterrows():
           line = list()
           # Numerical columns
           for col in num_cols:
               if row[col]:
                   line.append('{}:{}'.format(self.config['Index'][col], row[col]))
               # missing value -> 0
               else:
                   line.append('{}:0'.format(self.config['Index'][col]))

           # Categorical columns
           for col in cat_cols:
               if row[col] in self.config['Index'][col]:
                   line.append(self.config['Index'][col][str(row[col])])

           sorted_line = sorted(line, key=lambda x:int((re.search(r'[0-9]+', x)).group(0)))
           if is_target:
               converted_results.append(' '.join([row[self.config['Target']]] + sorted_line))
           else:
               converted_results.append(' '.join(sorted_line))

       return '\n'.join(converted_results)

if __name__ == '__main__':
   prefix = '/opt/ml/processing/'
   os.makedirs('/opt/ml/model/')

   # load model data
   with tarfile.open(os.path.join(prefix, 'input/model', 'model.tar.gz')) as tar:
       tar.extract('libsvm-converter.json', '/opt/ml/model/')

   # get csv files
   fnames = [os.path.join(dir, f)
               for dir, _, files in os.walk(os.path.join(prefix, 'input/data'))
               for f in files if f.endswith('.csv')]

   # convert csv to libsvm
   converter = LibSVMConverter()
   for fname in fnames:
       df = pd.read_csv(fname, dtype=str, keep_default_na=False, low_memory=False)
       result = converter.convert(df)

       fname_out = os.path.join(prefix, 'output', os.path.basename(fname).replace('.csv', '.csv.out'))
       with open(fname_out, 'w') as f_out:
           f_out.writelines(result)

以下のようにモデルファイルとCSVデータを渡してバッチジョブを実行すると、バッチ変換ジョブ Transformer と同様の結果が出力されます。

script_processor.run(
   code='s3://sagemaker-example/processing.py',
   inputs=[
       sagemaker.processing.ProcessingInput(
           source='s3://sagemaker-example/input/',
           destination='/opt/ml/processing/input/data'),
       sagemaker.processing.ProcessingInput(
           source='s3://sagemaker-example/model.tar.gz',
           destination='/opt/ml/processing/input/model')
   ],
   outputs=[
       sagemaker.processing.ProcessingOutput(
           source='/opt/ml/processing/output/',
           destination='s3://sagemaker-example/output')
   ])

モデル学習にSageMaker Processingを使用することも可能です。

import json
from collections import defaultdict
import tarfile
import pandas as pd
from sklearn.preprocessing import LabelEncoder

# load config
with open('/opt/ml/processing/input/config/config.json') as f:
   config = json.load(f)

# load csv as pandas
df = pd.read_csv(
   '/opt/ml/processing/input/data/train.csv',
   dtype=str,
   keep_default_na=False,
   low_memory=False)

config['Index'] = defaultdict(dict)

# Categorical columns with Label Encoding
if config['Encoding'] == 'label':
   encoder = LabelEncoder()
   for i, col in enumerate(config['Categorical']):
       for j, val in enumerate(encoder.fit(df[col]).classes_):
           config['Index'][col][str(val)] = '{}:{}'.format(i, j)

# Categorical columns with One-Hot Encoding
elif config['Encoding'] == 'onehot':
   x_ = pd.get_dummies(df[config['Categorical']], columns=config['Categorical'], prefix_sep='==')
   for i, x in enumerate(x_.columns):
       col, val = x.split('==')
       config['Index'][col][str(val)] = '{}:1'.format(i)

# Numerical columns
for j, col in enumerate(config['Numeric'], i+1):
   config['Index'][col] = j

# output model file
with open('libsvm-converter.json', 'w') as f:
   json.dump(config, f, indent=2, ensure_ascii=False)

with tarfile.open('/opt/ml/processing/model/model.tar.gz', 'w:gz') as tar:
   tar.add('libsvm-converter.json')

以下のようにconfigファイルとCSVデータを渡してバッチジョブを実行すると、トレーニングジョブ Estimator と同様のモデルファイルが作成できます。

script_processor.run(
   code='s3://sagemaker-example/train.py',
   inputs=[
       sagemaker.processing.ProcessingInput(
           source='s3://sagemaker-example/train.csv',
           destination='/opt/ml/processing/input/data'),
       sagemaker.processing.ProcessingInput(
           source='s3://sagemaker-example/config.json',
           destination='/opt/ml/processing/input/config')
   ],
   outputs=[
       sagemaker.processing.ProcessingOutput(
           source='/opt/ml/processing/model/',
           destination='s3://sagemaker-example')
   ])

おわりに

本記事では、SageMaker Processingで libsvm-converter を使うための方法を紹介しました。

SageMaker Processingはサーバーレスでタスクを実行できるため、「Airflowのタスク実行環境を分離する」で紹介されている ECSOperator の代替手段としても有効です。
Airflow Operator化する必要はありますが(もしくはPythonOperatorを利用する)、ECSタスク定義が不要になるのは大きなメリットになると思います。