【GCP+python】CloudRun上で、cloudstrageが変更された時に送られるPub/Subメッセージを受け取る

わたしへ。
夢中で実装したので今にも忘れそうです。
わたしより。

まず、「Pub/Subを受け取ったら起動する」ところまでの作り方はこっち参照↓
【GCP+Python3】CloudRunにPub/Subで起動するアプリを乗っけたい|yucco|note

CloudStorageが変更されたらPub/Subメッセージが送信される設定方法はこっち↓
【GCP】CloudStorageにファイルが入ったらPub/Subに通知をする設定の仕方|yucco|note

で、見るべき公式ドキュメントはこっち↓
Cloud Storage の画像処理のチュートリアル  |  Cloud Run のドキュメント  |  Google Cloud

この記事ではその後、受け取ったメッセージをひもといて処理へ繋げる部分について。

結論

import base64
import os
import process
import json
from flask import Flask, request

@app.route('/', methods=['POST'])
def index():
# そもそもpub/subメッセージが届いているか確認
   envelope = request.get_json()
   if not envelope:
       msg = 'Pub/Subメッセージが届いていません'
       print(f'error: {msg}')
       return f'Bad Request: {msg}', 400
   if not isinstance(envelope, dict) or 'message' not in envelope:
       msg = 'Pub/Subメッセージの形式が正しくありません'
       print(f'error: {msg}')
       return f'Bad Request: {msg}', 400
# Pub/Subメッセージ内のメッセージ部分だけ取り出してもうちょっとチェック
   pubsub_message = envelope['message']
   if not isinstance(pubsub_message, dict):
       msg = 'Pub/Subメッセージ内の[message]の値が辞書形式ではありません'
       print(f'error: {msg}')
       return f'Bad Request: {msg}', 400       
   if 'data' not in pubsub_message:
       msg = 'Pub/Subメッセージ内に[data]がありません'
       print(f'error: {msg}')
       return f'Bad Request: {msg}', 400       
   if 'attributes' not in pubsub_message:
       msg = 'Pub/Subメッセージ内に[attribute]がありません'
       print(f'error: {msg}')
       return f'Bad Request: {msg}', 400


# メッセージ内の[attributes]の中、[eventType]をチェックして、変更の内容が新規/削除/メタデータ更新のどれかを確認

# 属性情報を取得
attributes = pubsub_message['attributes']
# メッセージ本文を取得
data = json.loads(
               base64.b64decode(pubsub_message['data']).decode())
   process.main(data)

   if 'eventType' not in attributes:
       msg = 'eventTypeが登録されていません'
       print(f'error: {msg}')
       return f'Bad Request: {msg}', 400
if attributes['eventType'] == '	OBJECT_DELETE':
   msg = '削除'

if attributes['eventType'] == '	OBJECT_METADATA_UPDATE':
msg = 'メタデータ更新'


   if attributes['eventType'] == 'OBJECT_FINALIZE':
       msg = '新規作成、もしくは更新'

       print(f'{msg}')


# Storageにアクセスする
# 実際はこの辺りを切り出しておいてごにょごにょすると良い

from google.cloud import storage
storage_client = storage.Client()
file_data = data
       file_name = file_data['name']
       bucket_name = file_data['bucket']
       blob = storage_client.bucket(bucket_name).get_blob(file_name)
       contents = blob.download_as_string().decode()
print(contents)
return ('process successed', 200)

ポイント

request.get_json()を使って、json形式でメッセージを取得する

・メッセージは、{'message':{'attributes':{eventType:'OBJECT_FINALIZE',xxx:yyy,...},'data':(base64エンコードされたメッセージ本文)},あとメタデータ} 形式で入ってくるので、キーにmessageがあるか、その中にattributesとdataがあるかを確認する。

・dataはbase64エンコードされているので、jsonとして取得する場合

data = json.loads(base64.b64decode(pubsub_message['data']).decode())

を使う

・dataをデコードしたあと、バケット名はbucket、ファイル名はnameキーで取れる。

ここまで押さえておけばとりあえず次に進めるんじゃないでしょうかね……

この記事が気に入ったらサポートをしてみませんか?