【GCP+python3】Pub/Subにメッセージをパプリッシュする

意外とpythonからパブリッシュするコード書いたことなかったので。

準備

$ pip install --upgrade google-cloud-pubsub

結論

from google.cloud import pubsub_v1
import time
import json
class pubsub_publisher():
   def __init__(self):
       self.project_id = "PROJECT-ID"
       self.topic_id = "TOPIC-NAME"
       self.publisher = pubsub_v1.PublisherClient()
       self.topic_path = self.publisher.topic_path(self.project_id, self.topic_id)
       self.futures = dict()
       return
       
   def send(self,messages):
        for message_id,message in enumerate(messages):
           data = message
           self.futures.update({message_id:None})
           future = self.publisher.publish(
               self.topic_path, data=data.encode("utf-8")
           )
           self.futures[message_id] = future
           future.add_done_callback(self.get_callback(future,message_id))
       while self.futures:
           time.sleep(5)
       
       return(True)
   def get_callback(self,f,id):
       def callback(f):
           try:
               print(f.result)
               self.futures.pop(id)
           except:
               print(f"Please handle {f.exception()} for {id}.")
       return callback

if __name__ == '__main__':
   pub = pubsub_publisher()
   data = {"test":"hoge"}
   data_j = json.dumps(data)
   pub.send(1,data_j)

公式ドキュメントよりちょっと実用的に修正済み。

公式ドキュメントはここ

トピックへのメッセージのパブリッシュ  |  Cloud Pub/Sub  |  Google Cloud

説明

初期化関数

   def __init__(self):
       self.project_id = "PROJECT-ID"
       self.topic_id = "TOPIC-NAME"
       self.publisher = pubsub_v1.PublisherClient()
       self.topic_path = self.publisher.topic_path(self.project_id, self.topic_id)
       self.futures = dict()
       return

プロジェクトIDと送信先のトピック名を設定。

クライアントインスタンスを立ち上げて、topic_pathを生成。

全てのメッセージの送信が終わったかをチェックするためのfutures()を用意しておく。

メッセージ送信部分

      def send(self,messages):
        for message_id,message in enumerate(messages):
           data = message
           self.futures.update({message_id:None})
           future = self.publisher.publish(
               self.topic_path, data=data.encode("utf-8")
           )
           self.futures[message_id] = future
           future.add_done_callback(self.get_callback(future,message_id))
       while self.futures:
           time.sleep(5)
       

messagesの中身は、strのリストとかdictのリストとか渡せば良いです。publisher使ってメッセージをパブリッシュするとfutureオブジェクトが返ってくるので、コールバックを取得して(後述)、futuresに入れておく。

futuresに何か入っている間中ちょっと待つ。

コールバック取得部分

   def get_callback(self,f,id):
       def callback(f):
           try:
               print(f.result)
               self.futures.pop(id)
           except:
               print(f"Please handle {f.exception()} for {id}.")
       return callback

言われるがまま……正直何をどうしているのかうすぼんやりとしか解っていない……うすぼんやり……

これでインスタンス作って、send関数にメッセージのリストを渡せば、メッセージを1つずつPub/Sub送信してくれる。

この記事が気に入ったらサポートをしてみませんか?