【GCP+python3】Pub/Subにメッセージをパプリッシュする
意外とpythonからパブリッシュするコード書いたことなかったので。
準備
$ pip install --upgrade google-cloud-pubsub
結論
from google.cloud import pubsub_v1
import time
import json
class pubsub_publisher():
def __init__(self):
self.project_id = "PROJECT-ID"
self.topic_id = "TOPIC-NAME"
self.publisher = pubsub_v1.PublisherClient()
self.topic_path = self.publisher.topic_path(self.project_id, self.topic_id)
self.futures = dict()
return
def send(self,messages):
for message_id,message in enumerate(messages):
data = message
self.futures.update({message_id:None})
future = self.publisher.publish(
self.topic_path, data=data.encode("utf-8")
)
self.futures[message_id] = future
future.add_done_callback(self.get_callback(future,message_id))
while self.futures:
time.sleep(5)
return(True)
def get_callback(self,f,id):
def callback(f):
try:
print(f.result)
self.futures.pop(id)
except:
print(f"Please handle {f.exception()} for {id}.")
return callback
if __name__ == '__main__':
pub = pubsub_publisher()
data = {"test":"hoge"}
data_j = json.dumps(data)
pub.send(1,data_j)
公式ドキュメントよりちょっと実用的に修正済み。
公式ドキュメントはここ
トピックへのメッセージのパブリッシュ | Cloud Pub/Sub | Google Cloud
説明
初期化関数
def __init__(self):
self.project_id = "PROJECT-ID"
self.topic_id = "TOPIC-NAME"
self.publisher = pubsub_v1.PublisherClient()
self.topic_path = self.publisher.topic_path(self.project_id, self.topic_id)
self.futures = dict()
return
プロジェクトIDと送信先のトピック名を設定。
クライアントインスタンスを立ち上げて、topic_pathを生成。
全てのメッセージの送信が終わったかをチェックするためのfutures()を用意しておく。
メッセージ送信部分
def send(self,messages):
for message_id,message in enumerate(messages):
data = message
self.futures.update({message_id:None})
future = self.publisher.publish(
self.topic_path, data=data.encode("utf-8")
)
self.futures[message_id] = future
future.add_done_callback(self.get_callback(future,message_id))
while self.futures:
time.sleep(5)
messagesの中身は、strのリストとかdictのリストとか渡せば良いです。publisher使ってメッセージをパブリッシュするとfutureオブジェクトが返ってくるので、コールバックを取得して(後述)、futuresに入れておく。
futuresに何か入っている間中ちょっと待つ。
コールバック取得部分
def get_callback(self,f,id):
def callback(f):
try:
print(f.result)
self.futures.pop(id)
except:
print(f"Please handle {f.exception()} for {id}.")
return callback
言われるがまま……正直何をどうしているのかうすぼんやりとしか解っていない……うすぼんやり……
これでインスタンス作って、send関数にメッセージのリストを渡せば、メッセージを1つずつPub/Sub送信してくれる。
この記事が気に入ったらサポートをしてみませんか?