異常検知を備えたIoTシステムを簡単に構築したいので、まずはAWS Sage Makerを試してみた
こんにちは遠藤です。
センサーデータをクラウドに送って、なんかあったら知らせてくれるといったシステムを電子工作レベルで簡単に構築できないものかと思って調べ始めています。
「なんかおかしい?」の判断する仕組みで考えられるものは、ロジックで判定するか機械学習モデルによって推定するかのどちからでしょうか。送信するデータが、電源の電圧などでしきい値が予め判断できそうななものはロジックで解決できそうですし、温度などのデータは時間変動などもあるので機械学習を使わないといい具合に異常検知をすることは難しそうです。
また「なんかおかしい?」は「センサーデータをクラウドに送って」の後に判定する仕組みにしたいと考えています。つまりデバイス側で判定するのではなく、センサーデータをクラウドに送って、クラウド側に保存したデータで判定することを想定です。今回クラウドサービスについてはAWSを利用しますが、AWSを選んだ理由はセンサーデータをデバイスからAWS IoTに送信するところまでは試していたので他サービスより慣れているというだけです。AWSなので異常検知のところはSage Makerを試しています。
最終的には以下のようなシステムを構築して今後作品を考えるときに使えればといったところです。なるべくシンプルにIoT Core > Kinesis > s3 > Lambda > SageMaker > SNSで通知な流れをイメージしています。
AWSで実装するとシンプルにしてもこれくらいサービスを組み合わせる必要があります。。趣味の電子工作でCloudFormationとか書きたくないので、ポチポチっと環境を整えられるこのくらいの範囲のベストプラクティスみたいなものを探って行きたいと思っています。
最近仕事でAWSに触る機会もすっかりなくなり、この記事はかなり素人な知識で書いた内容になります。実際にAWSで機械学習システムを構築して運用を考えていらっしゃる方は、公式のドキュメントやDeveloper IOの記事などを参考にされると良いと思います。とはいえ、利用しているサービスは実運用されているシステムで利用されているものですし、こんな感じでIoTのシステムって実装されているのでという雰囲気がわかるシリーズにしたいと思っています。
AWS IoTを調べはじめたきっかけ
AWS IoTなどを調べはじめたきっかけですが、私は以前にコピペテックでobnizを使ったIoTの作品をいくつか紹介しています。obnizはインターネットごしにデバイス操作をなんでもできるのが売りでしたが、そのため基本動作としてクラウドからデバイスに問い合わせが必要となりました(今はわかりません)。例えば5分おきに温度センサーの値をクラウドを送信したい場合、スリープ復帰のイベントをクラウド側で受けてデバイスに計測値を取りに行くという流れになります。クラウド側からプルしないならない仕組みはちょっとシンプルではなく、とくにバッテリー駆動の場合はデバイスの起動時間はなるべく短くしたいところなのに、1度の計測で数秒から数十秒の起動時間が必要でした。
センサーデータをクラウドに送るのであれば、デバイスでセンサーデータを取得後MQTTメッセージを送ったらすぐにスリープできるので起動時間も1秒未満で済むはず。ということで、他のサービスで実装するとしたらどこがいいかな?と思いあたるのがAWS、Azure、GoogleあたりのIoTプラットフォーム、その中から無料枠がそこそこあり、情報も多いAWSを試し始めた次第です。
ルールベースか機械学習か?
冒頭でルールベースでロジックを組むケースと、機械学習で推論するケースがありそうだという話をしましたが、AWS IoTではどちらも対応可能です。
ルールベースの場合は、AWS IoT Eventsで状態変化を定義できます。「電源電圧がしきい値を下回った場合に通知する」というようなしきい値がすでに解ってるケースには、とくに機械学習モデルで予測をしなくても異常検知システムを構築することはできます。
一方で、温度変化などのように、周期的に変化するようなデータの場合には、しきい値を決めることは難しいです。そのようなケースは機械学習により割り出されたしきい値を使用するのが適しています。運用に入った後も継続して学習を行いしきい値もより適切な値にすることができます。
SageMakerとは
SageMakerの概要についてChatGPTに短くまとめてもらいました。
機械学習プラットフォームと書いてあるように、基本的にはプラットフォームを提供しているのみなので、実際に使いこなすにはAWSと機械学習自体の知識が必要となります。
しかし、それらの知識があれば、トレーニングの際にはGUIまたはライブラリを通して自動でトレーニングを実行するインスタンスを起動してくれたり、学習したモデルを簡単にAPIとしてデプロイできたりと便利です。
インスタンスの準備しPythonをセットアップするなど、機械学習用のシステムを一から構築をしなくても「機械学習モデルを構築、トレーニング、デプロイ」を一気通貫で提供してくれているのが便利なところかと思います。
SageMakerの異常検知のチュートリアルをやってみた
とにかく触ってみるのがコピペテックの趣旨なのでまずはチュートリアルをやっってみました。
AWSは各サービス向けのチュートリアルが充実しており、SageMakerにも多くのチュートリアルが用意されています。今回やりたいことは異常検知なので、以下のチュートリアルを選びました。
こちらのチュートリアルは日本語に翻訳されて言いますし、手順はこの通り進めることができます。ただ、実際にチュートリアルを進めてみたところ、ライブラリのバージョンアップなどにより詰まった箇所がありました。
以下、概念的なところと詰まったところの解決方法を中心にまとめていきます。
アルゴリズム「Random Cut Forest (RCF)」について
異常検知は、教師なし学習のアルゴリズムを使用します。
教師ありのアルゴリズムにランダムフォレストがありますが、RCFは決定木をデータの異常を識別するために使用する教師なし学習のアルゴリズムらしいです。(ChatGPTに聞いたw)
Random Cut Forest (RCF)の詳細は以下のドキュメントにまとまっています。
異常検知によく使われる表層学習のアルゴリズムは、ほかに以下のようなものがあるらしいです。
Isolation Forest
Nearest Neighbors (KNN):
One-Class SVM
Local Outlier Factor (LOF)
DBSCAN
ちょっと前の記事ですが以下は参考になりそうなので時間があるときに読んでおきたいと思っています。
使用するデータセットと得られる結果
チュートリアルでは、ニューヨーク市のタクシー利用者数、6 か月間分で構成されるサンプルデータセット Numenta Anomaly Benchmark (NAB) New York City Taxi dataset を使用します。
以下は、データセットのグラフですが1週間ごとの周期性があります。
チュートリアルの目標は、学習したモデルを使用して期間内に発生していた既知のイベント3回を異常と判定することになります。
事前準備: ノートブック インスタンスを起動する
チュートリアルを進めるには、まずJupyter notebookを実行するためのノートブックインスタンスを用意しないとなりません。
まずは、以下のチュートリアルの「ステップ 1 – Amazon SageMaker ノートブックインスタンスをセットアップする」の手順に沿ってノートブックインスタンスを起動しておきます。
ノートブックインスタンスが起動したら「Jupyter」を開き、「conda_python3」のノートブックを作成します。
ノートブックが立ち上がったら目的のRandom Cut Forestのチュートリアルを進めることができます。
ノートブックの修正しながらチュートリアルを進める
チュートリアルを実際に進めていくと、いろいろ動かないのでPythonのコードを修正する必要があります。具体的には、チュートリアルのコードはSageMaker Python SDKとpandasのバージョンが古いため新しいバージョンのインタフェースに合わせて修正しました。
以下、修正した箇所を中心に解説します。
CSVをエンコードしてs3にアップロードするコードの修正
まずは以下、ダウンロードしたタクシー利用者数のCSVをRecordIO Protobuf 形式にエンコーディングしてs3にアップロードするコードです。
def convert_and_upload_training_data(
ndarray, bucket, prefix, filename='data.pbr'):
import boto3
import os
from sagemaker.amazon.common import RecordSerializer
# convert Numpy array to Protobuf RecordIO format
serializer = RecordSerializer()
buffer = serializer.serialize(ndarray)
# upload to S3
s3_object = os.path.join(prefix, 'train', filename)
boto3.Session().resource('s3').Bucket(bucket).Object(s3_object).upload_fileobj(buffer)
s3_path = 's3://{}/{}'.format(bucket, s3_object)
return s3_path
bucket = 'sage-maker-hello-rcf' # <-- use your own bucket, here
prefix = 'sagemaker/randomcutforest'
s3_train_data = convert_and_upload_training_data(
taxi_data.value.to_numpy().reshape(-1,1),
bucket,
prefix)
SageMaker Python SDKのバージョンが1系から2系に変わっているので、以下の「Serializer and Deserializer Classes」のセクションの表にしたがって修正します。
from sagemaker.amazon.common import numpy_to_record_serializer
→ from sagemaker.amazon.common import RecordSerializerserializer = numpy_to_record_serializer()
→ serializer = RecordSerializer()buffer = serializer(ndarray)
→ buffer = serializer.serialize(ndarray)
また、taxi_dataはpandas.read_csvの戻り値ですが、valuesが返すDataFrameのas_matrixはto_numpyに変わっているのでこちらも修正します。
taxi_data.value.as_matrix().reshape(-1,1),
→ axi_data.value.to_numpy().reshape(-1,1),
モデルのトレーニングをするコードの修正
次にモデルのトレーニングを実行する以下のコードを修正しました。
import boto3
import sagemaker
containers = {
'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/randomcutforest:latest',
'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/randomcutforest:latest',
'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/randomcutforest:latest',
'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/randomcutforest:latest'}
region_name = boto3.Session().region_name
container = containers[region_name]
session = sagemaker.Session()
rcf = sagemaker.estimator.Estimator(
container,
sagemaker.get_execution_role(),
output_path='s3://{}/{}/output'.format(bucket, prefix),
instance_count=1,
instance_type='ml.c5.xlarge',
sagemaker_session=session)
rcf.set_hyperparameters(
num_samples_per_tree=200,
num_trees=50,
feature_dim=1)
s3_train_input = sagemaker.inputs.TrainingInput(
s3_train_data,
distribution='ShardedByS3Key',
content_type='application/x-recordio-protobuf')
rcf.fit({'train': s3_train_input})
sagemaker.estimator.Estimator の初期化パラメータ名が変更されているので修正します。
train_instance_count > instance_count
train_instance_type > instance_type
参照: Renamed Estimator Parameters
set_hyperparametersメソッドに指定するパラメータ feature_typeはfeature_dimに変わっています。
最後に、sagemaker.session.s3_input は sagemaker.inputs.TrainingInputに置き換えます。
参照: s3_input
推論エンドポインをデプロイするコードの修正
トレーニング済みモデルを推論エンドポイントにデプロイするコードを修正していきます。
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
rcf_inference = rcf.deploy(
initial_instance_count=1,
instance_type='ml.c5.xlarge',
)
rcf_inference.content_type = 'text/csv'
rcf_inference.serializer = CSVSerializer()
rcf_inference.deserializer = JSONDeserializer()
SageMaker SDKの非互換については同様に、Use Version 2.x of the SageMaker Python SDKのPre-instantiated Serializer and Deserializer Objectsの対応表に従って修正します。
from sagemaker.predictor import csv_serializer, json_deserializer
-> from sagemaker.serializers import CSVSerializer
-> from sagemaker.deserializers import JSONDeserializerrcf_inference.serializer = csv_serializer
-> rcf_inference.serializer = CSVSerializer()rcf_inference.deserializer = json_deserializer
-> rcf_inference.deserializer = JSONDeserializer()
推論エンドポイントを使用して異常スコアを予測するコードの修正
デプロイされたエンドポイントを使用して以上スコアを予測するコードの修正をしていきます。
results = rcf_inference.predict(taxi_data.value.to_numpy().reshape(-1,1))
scores = [datum['score'] for datum in results['scores']]
taxi_data['score'] = pandas.Series(scores, index=taxi_data.index)
score_mean = taxi_data.score.mean()
score_std = taxi_data.score.std()
score_cutoff = score_mean + 3*score_std
anomalies = taxi_data[taxi_data['score'] > score_cutoff]
こちらは、前述のpandasのDataFrameのメソッドas_matrixをto_numpyに変更します。
results = rcf_inference.predict(taxi_data.value.as_matrix().reshape(-1,1))
-> results = rcf_inference.predict(taxi_data.value.to_numpy().reshape(-1,1))
1日48ポイントのまとまりで学習し直すようにコードの修正
チュートリアルでも解説されている通り、30分おきの乗客数を一次元配列で学習したモデルで推論した場合、異常スコアの予測にいくつかのノイズが拾われてしまう問題があります。
改善策としてチュートリアルでは、30分おきなので1日48ポイントを人まとまりとして学習させることによって、ノイズを減らす試みを紹介しています。
以下、ノートブックの修正箇所をまとめます。
最初に、CSVをダンロードした後に以下のコードを追加します。
以下は乗客数のデータを48次元の配列に変換しています。
import numpy as np
def shingle(data, shingle_size):
num_data = len(data)
shingled_data = np.zeros((num_data - shingle_size, shingle_size))
for n in range(num_data - shingle_size):
shingled_data[n] = data[n : (n + shingle_size)]
return shingled_data
# single data with shingle size=48 (one day)
shingle_size = 48
prefix_shingled = "sagemaker/randomcutforest_shingled"
taxi_data_shingled = shingle(taxi_data.values[:, 1], shingle_size)
次に、s3にアップロードするconvert_and_upload_training_dataの引数をtaxi_data_singledに置き換えます。
s3_train_data = convert_and_upload_training_data(
# taxi_data.value.to_numpy().reshape(-1,1),
taxi_data_shingled,
bucket,
prefix)
set_hyperparametersのfeature_dmに48をセットします。実際はshingle_sizeをした方が良さそうです。
rcf.set_hyperparameters(
num_samples_per_tree=200,
num_trees=50,
# feature_dim=1)
feature_dim=48)
スコアの推論のコードを置き換えます。
results = rcf_inference.predict(taxi_data_shingled)
scores = [datum['score'] for datum in results['scores']]
taxi_data['score'] = pandas.Series(scores)
グラフを出力した結果は以下になりました、チュートリアルの画像とほぼ同じですが若干ご検知が多いです。
SageMakerを使ってみて
昔の記憶を思い出しながらPythonのコードを修正してチュートリアルを完了することができましたが、たまにしか触らない身にとってはちょっと手間がかかるなという印象です。特にshingling (重複する可能性の検出) による改善のセクションは、コードは一部のみ提示してあるのみなので、修正に少々手こずりました。
とはいえ、あくまでこちらはJupyerノートブックを使った場合で、ノーコードツールの方を使えば解決するのかもしれません。
Python書かないとならない問題を除くと、s3へのアップロードから、学習インスタンスの立ち上げと実行、学習済みモデルのアップロードなどSageMaker SDKにより記述できるのはかなり楽です。
次に調べること
モデルをトレーニングしてデプロイするところまではわかったので、次は他のデータを利用して検証してみたいのと、デプロイしたモデルを利用してLambda上で推論するあたりを試してみたいと思います。
とくに、連続して収集されるセンサーデータをどのくらいのまとまりで、どういったタイミングでLambdaを起動し、デプロイしたモデルで推論するのかというあたりがよくわかっていないポイントなので、そのあたりはしっかり検証してみたいところです。
この記事が気に入ったらサポートをしてみませんか?