【Part5】エッジのメッセージをさばいてS3へと保存する
AWSでエッジコンピューティング環境を作る
Part5です。デプロイ環境が構築できていない方はPart4までをどうぞ
【Part4】AWSでエッジコンピューティング環境を作る(ラズパイデプロイ編)
前回までで、ラズパイから遠隔デプロイを行い処理関数を更新する事ができ、さらにAWS IoTにデータを送信することができました。今回は、エッジ側の拡張の話と、AWS IoTがデータを受信した後の話をしたいと思っています。
データのさばき方
データをさばいてS3へ送る方法はいくつかあって、ぱっと思いついたところで↓の様な方法があります。それぞれ特徴と一緒にまとめてみました。
1. AWS IoTからRuleでさばく方法
・シンプル
2. AWS IoTからRuleでLambdaをつないでさばく方法
・できることが広がる。
3. Greengrassのコネクタでサービスにつないでさばく方法
・大規模データでも耐えられるようなアーキテクチャになる
今回は、1. 2.に関して説明します。(実際にエッジコンピューティング用に使ったのは2.です。)
1. AWS IoTからRuleでさばく
AWS IoTに来たメッセージは、ACTという機能を使ってその後どうやって処理するのかを決めることができます。
ルールを作成していきます。まず、適当な名前を入力します。
下に、ルールクエリステートメントというのがあります、AWS IoTでは、メッセージををSQLライクに扱えるので、慣れている人にはとても便利です。今回は、hello/worldトピックから拾ったメッセージをすべて流すので、以下のように記入します。
SELECT * FROM "hello/world"
どういう命令を書けるかは、公式ページを参照ください。
REF : AWS IoT SQL リファレンス
次に、アクションを追加します。今回は、拾ったものをS3に流すので、それができるアクションを追加します。
新規作成でも良いですし、すでに既存のS3バケットを使いたいならばそれを使っても構いません。
キーというのが、保存されるオブジェクトの名前になります。ここを固定の名前しちゃうと、オブジェクト名が同じになるので、メッセージが来るたびにそのオブジェクトの中身が更新されるという事になってしまうので、変数を用いると便利です。下記のようにすると、入ってきたトピックとタイムスタンプから名前を作ってくれます。
${topic()}/${timestamp()}
バケットは、下図の項目でバケット名と③のアクセス許可の設定の箇所だけいじります。バケット名はAWS内で一意に定まらなければいけないので、ユニークな命名をしましょう。また、AWS IoTからのデータ投入を受け付けるために、アクセス許可の設定をします。本当の運用時は、特定のサービスからのみ受け付けるようにしておくべきですが、今回はとりあえず③アクセス許可の設定において、チェックボックスのアクセス制御を外しフルオープンにして、簡単に受けられるようにしておきます。
これで設定完了です。実際に前パートまでの設定が完了して、トピックがAWSに送られているとすると、下図のようにタイムスタンプが名前となってデータがS3に格納されていきます。
2. AWS IoTからRuleでLambdaをつないでさばく
次は、Lambdaにデータを流していきます。Lambdaを挟むことで、データの加工など、自由にデータをさばけるようになるのと、AWS内外のサービスとの連携も簡単に行えるので、柔軟性が上がります。
S3転送用のLambdaを作って、AWS IoTのルールを介してそこにメッセージを送るのですが、その前に、エッジで動いているGreengrass側のLambdaを少しいじります。理由は、AWS IoTからLambdaに流すメッセージの形がJSON形式じゃないとエラーが出てLambdaが発火しないからです。なので、そこだけちょっと書き換えたバージョンを作りました。これをコピペして、Part3, Part4でやったようにLambdaのバージョニング/エイリアスの付け替えをして、ラズパイにデプロイしてください。
#
# Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
import greengrasssdk
import platform
from threading import Timer
import json
client = greengrasssdk.client('iot-data')
my_platform = platform.platform()
def greengrass_hello_world_run():
topic = "hello/world"
if not my_platform:
message = "Hello world!!"
else:
message ='Hello world! Sent from Greengrass Core running on platform: {}'.format(my_platform)
#jsonify
payload=json.dumps({"message":message})
client.publish(
topic='hello/world',
payload=payload
)
# Asynchronously schedule this function to be run again in 5 seconds
Timer(5, greengrass_hello_world_run).start()
# Start executing the function above
greengrass_hello_world_run()
# This is a dummy handler and will not be invoked
# Instead the code above will be executed in an infinite loop for our example
def function_handler(event, context):
return
さて、そうすると、AWS IoTにJSON形式で送られてくるようになるので、その後段でのLambdaの処理を行うことができます。
さて、実際にルールを作成していきましょう。先程の直接S3に投げたときと同様、AWS IoTのACTから、ルール名を定め、アクションの追加でLambdaを選択します。
Lambdaも新しいのを作っていきましょう。
Python3で書いていきます。AWSのサービスをPythonで取り扱う場合は、boto3というライブラリを用います。コードは以下の通り。
REF : Python boto3 でAWSを自在に操ろう ~入門編~
import os
import boto3
from datetime import datetime
def upload_img(mes):
s3 = boto3.resource('s3')
bucket = os.environ["BUCKET"] #環境変数から
dir = "viaLambda" #lambda経由で
# 時刻をつけて格納
filename = 'mes_{}.text'.format(datetime.now().strftime('%Y%m%d-%H%M%S'))
obj = s3.Object(bucket, os.path.join(dir,filename))
print("put",filename,"to",bucket)
# S3にデータをぶちこむ
obj.put(Body=mes,ContentType="text")
return
def lambda_handler(event, context):
mes = event["data"]
upload_img(mes)
return 0
バケット名は環境変数に持たせているので、そこで入力します
Lambda自体はこれで完了ですが、どのサービス起因でこのLambdaが発火するのか、またどのサービスへつながるのかなどはまだ何も決めていません。そのような設定をDesignerのところでやっていきます。
どのサービスにつなぐのかは、ロールの設定をしてやらなければいけません。
IAMの設定で、LambdaをS3につなぐ便利なポリシー
"AWSLambdaExecution"をアタッチしたロールを作成しましょう。下図のようなロールを作ればOKです
下記のようになれば、AWS IoTとS3をつなぐ準備は完了です。
成功していれば、この様にタイムスタンプがオブジェクト名になっているデータがS3に流れていきます。さっきのLambdaは横流ししているだけですが、編集したり、別のサービスに繋げられたりするというメリットがあるわけです。
【TIPS】AWS IoTルール設定時にバグを見つける。
今回、AWS IoTからのメッセージでLambdaに発火しないというバグがありました。その理由はさっきも少し書いたように「LambdaにはJSON形式で投げる必要がある」というものだったのですが、その発見に役立ったのがAWS IoTルール作成時の「エラーアクション」です。
エラーが起こった時に、たとえばトピック"lambda/error"にメッセージを履くようにしておけば、エラーログが吐き出されるので、デバッグができるというわけです。
今回は、下のようなエラーメッセージが出てきて、これをググると、メッセージの型の問題だなと判明して一件落着したわけです。AWS IoTルール周りでエラーが出たときは、エラーアクションを用いてログを確認してみましょう。
{
"ruleName": "HelloWorld",
"topic": "hello/world",
"cloudwatchTraceId": "XXXXXXXXXXXXX",
"clientId": "hokeGGGroup1st_Core-c00",
"base64OriginalPayload": "xxxxxxxxxxxxxxxxxxxxxxxx",
"failures": [
{
"failedAction": "LambdaAction",
"failedResource": "arn:aws:lambda:ap-northeast-1:xxxxxxx:function:test-fromIoTtoS3",
"errorMessage": "Failed to invoke lambda function. Received Server error from Lambda. The error code is 400"
}
]
}
Greengrassのコネクタでサービスにつないでさばく
Greengrassのコネクタ機能を使って、Kinesis Firehoseを介してS3に投げると言う手段もあります。今回は詳しくは割愛しますが、Kinesis Firehoseはデータをバッファリングする機能があるので、大量のデータが一気に来たときにそれを平準化することができます。大量のデータがバースト的に来て、後段の処理がやばいときなどに使うと良いでしょう。
REF : Kinesis Firehoseを使ってみた
まとめ
これで取得したデータを格納するところまでかけました。
今回はテキストデータでしたが、次は実際にラズパイで撮った写真をS3に格納していきたいと思います!
ラズパイカメラのGreengrassでの取り扱い方
画像をメッセージとして送る方法
あたりを説明します。ではではっ
Part4. ラズパイデプロイ編
Part6. カメラ撮影・S3保存編
サポートいただけると励みになります! よろしくおねがいします!!