見出し画像

セールスフォース Pub/Sub API: リアルタイムデータ統合の新時代

概要

セールスフォースのPub/Sub APIは、リアルタイムデータのストリーミングとイベント駆動型のアーキテクチャをサポートする新しいAPIです。このAPIは、組織内外でのデータの即時共有を可能にし、リアルタイムのデータ統合と連携を実現します。Pub/Sub APIを利用することで、セールスフォース内のイベントを他のシステムやサービスに通知し、即時に反応させることができます。

アーキテクチャー

Pub/Sub APIのアーキテクチャは、イベントの発行者(Publisher)と購読者(Subscriber)というモデルに基づいています。主要なコンポーネントは以下の通りです:

  • イベントバス:すべてのイベントが流れる中心的なコンポーネントで、イベントのルーティングと配信を担当します。

  • パブリッシャー:イベントを生成し、イベントバスに発行するアプリケーションやサービス。

  • サブスクライバー:イベントバスからイベントを購読し、必要な処理を行うアプリケーションやサービス。

  • イベントオブジェクト:発行されるイベントのデータモデル。各イベントは特定のスキーマに従って構造化されています。

以下は、gRPCを組み込んだPub/Sub APIのアーキテクチャを示すPlantUMLのダイアグラムです:

gRPCプロトコル

gRPCとセールスフォース Pub/Sub APIの関係性

セールスフォースのPub/Sub APIは、イベント駆動型アーキテクチャをサポートし、リアルタイムでデータをやり取りするために設計されています。このAPIの実装において、gRPCは以下のような役割を果たしています:

  1. 高速な通信:gRPCはバイナリ形式のデータ伝送を行うため、従来のREST APIよりも高速で効率的な通信が可能です。これにより、Pub/Sub APIは高スループットなイベント処理を実現できます。

  2. スケーラビリティの向上:gRPCはHTTP/2を利用しており、マルチプレクシング、圧縮、双方向ストリーミングなどの機能を提供します。これにより、Pub/Sub APIは多数のイベントを同時に処理し、スケーラビリティを向上させることができます。

  3. 双方向通信:Pub/Sub APIはイベントの発行者と購読者の間で双方向のデータストリームをサポートしており、gRPCの双方向ストリーミング機能を利用して、リアルタイムでのデータや通知のやり取りを効率化します。

メリット、デメリット

メリット

  1. リアルタイム性:即時性が求められるアプリケーションに最適。

  2. スケーラビリティ:大規模なデータストリームを効率的に処理可能。

  3. 柔軟性:多様なシステムやサービスと容易に連携できる。

  4. 拡張性:必要に応じて新しいイベントタイプやサブスクライバーを追加可能。

デメリット

  1. 複雑さ:従来のAPIに比べて設定や管理が複雑。

  2. コスト:大量のデータを処理するためのインフラコストが高くなる可能性。

  3. 信頼性:ネットワークの問題や障害時にデータの損失リスクがある。

ユースケース

サービスクラウド

  1. リアルタイムのケース更新通知:顧客がサポートケースを更新した際に、エージェントにリアルタイムで通知。

  2. インシデント管理:重大なインシデントが発生した際に、関係者全員に即時通知し、迅速な対応を促す。

セールスクラウド

  1. リードのリアルタイム通知:新しいリードが生成されたときに営業担当者に即時通知。

  2. 商談の進捗通知:商談のステージが変更された際に、関係者にリアルタイムで通知。

マーケティングクラウド

  1. キャンペーンのリアルタイム分析:マーケティングキャンペーンの進行状況や成果をリアルタイムでモニタリングし、即時に調整。

  2. 顧客エンゲージメント通知:顧客が特定のアクションを起こした際にマーケティングチームに通知し、パーソナライズされた対応を促す。

コマースクラウド

  1. 注文処理のリアルタイム通知:新しい注文が入った際に、在庫管理システムや配送部門に即時通知。

  2. 顧客行動のリアルタイム追跡:顧客のウェブサイト上での行動をリアルタイムで追跡し、パーソナライズされたオファーやプロモーションを提供。

CometDと比較

CometDは、長い間セールスフォースでリアルタイムのイベント処理を行うための標準ツールとして使用されてきましたが、Pub/Sub APIと比較するといくつかの違いがあります。

  • スケーラビリティ:Pub/Sub APIは、CometDよりも大規模なデータストリームを効率的に処理できます。

  • 柔軟性:Pub/Sub APIは、より多くのイベントタイプと複数のサブスクライバーをサポートします。

  • 管理の容易さ:CometDは比較的シンプルな設定で始められますが、Pub/Sub APIはその柔軟性とスケーラビリティのために、より高度な管理が必要です。

  • 機能の拡張性:Pub/Sub APIは新しい機能やイベントタイプの追加が容易で、今後の拡張性が高い。

PlatformEventとPub/Sub APIの統合サンプル

以下に、PlatformEventとPub/Sub APIの統合サンプルコードを示します。

Step 1: Generate the Stub Files

必要な準備

pip3 install grpcio grpcio-tools avro requests

Pub/Sub APIのGitHubリポジトリをクローン:

git clone https://github.com/forcedotcom/pub-sub-api

pubsub_api.protoファイルを使用してスタブファイルを生成:

cd pub-sub-api/python
python3 -m grpc_tools.protoc --proto_path=../ pubsub_api.proto --python_out=. --grpc_python_out=.

Step 2: Build the Python Client

新しいPythonファイル(例:PubSubAPIClient.py)を作成し、以下のインポート文を追加します。

import grpc
import requests
import threading
import io
import pubsub_api_pb2 as pb2
import pubsub_api_pb2_grpc as pb2_grpc
import avro.schema
import avro.io
import time
import certifi
import json

semaphore = threading.Semaphore(1)
latest_replay_id = None

with open(certifi.where(), 'rb') as f:
    creds = grpc.ssl_channel_credentials(f.read())
with grpc.secure_channel('api.pubsub.salesforce.com:7443', creds) as channel:
    username = '{your username}'
    password = '{your password}'
    url = '{the appropriate login URL}'
    headers = {'content-type': 'text/xml', 'SOAPAction': 'login'}
    xml = f"""<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/'
    xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
    xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>
    <urn:login>
    <urn:username><![CDATA[{username}]]></urn:username>
    <urn:password><![CDATA[{password}]]></urn:password>
    </urn:login></soapenv:Body></soapenv:Envelope>"""
    res = requests.post(url, data=xml, headers=headers, verify=False)
    print(res.content)
    
    sessionid = '{the session ID you just got from the XML}'
    instanceurl = '{the first part of the server URL ending in .com}'
    tenantid = '{organization ID}'
    authmetadata = (('accesstoken', sessionid),
    ('instanceurl', instanceurl),
    ('tenantid', tenantid))
    
    stub = pb2_grpc.PubSubStub(channel)
    
    def fetchReqStream(topic):
        while True:
            semaphore.acquire()
            yield pb2.FetchRequest(
                topic_name = topic,
                replay_preset = pb2.ReplayPreset.LATEST,
                num_requested = 1)

    def decode(schema, payload):
      schema = avro.schema.parse(schema)
      buf = io.BytesIO(payload)
      decoder = avro.io.BinaryDecoder(buf)
      reader = avro.io.DatumReader(schema)
      ret = reader.read(decoder)
      return ret
    
    mysubtopic = "/data/OpportunityChangeEvent"
    print('Subscribing to ' + mysubtopic)
    substream = stub.Subscribe(fetchReqStream(mysubtopic), metadata=authmetadata)
    for event in substream:
      if event.events:
        semaphore.release()
        print("Number of events received: ", len(event.events))
        payloadbytes = event.events[0].event.payload
        schemaid = event.events[0].event.schema_id
        schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schemaid), metadata=authmetadata).schema_json
        decoded = decode(schema, payloadbytes)
        print("Got an event!", json.dumps(decoded))
      else:
        print("[", time.strftime('%b %d, %Y %l:%M%p %Z'), "] The subscription is active.")
      latest_replay_id = event.latest_replay_id

Step 3: Set Up Events

変更データキャプチャ(Change Data Capture)イベントを購読するオブジェクトを設定します。

Step 4: Write Code That Subscribes to an Event Channel

購読するトピック名を設定します。

mysubtopic = "/data/OpportunityChangeEvent"
print('Subscribing to ' + mysubtopic)

Step 5: Write Code That Publishes a Platform Event Message

新しいPythonファイル(例:publish.py)を作成し、必要なインポート文と共通コードを含めます。

from datetime import datetime, timedelta

# 既存のインポート文、チャンネル作成コード、認証コード、スタブのコードを含める

mypubtopic = '{your publish topic}'
schemaid = stub.GetTopic(pb2.TopicRequest(topic_name=mypubtopic), metadata=authmetadata).schema_id
schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schemaid), metadata=authmetadata).schema_json

ペイロードをエンコードする関数を作成します。

def encode(schema, payload):
  schema = avro.schema.parse(schema)
  buf = io.BytesIO()
  encoder = avro.io.BinaryEncoder(buf)
  writer = avro.io.DatumWriter(schema)
  writer.write(payload, encoder)
  return buf.getvalue()

PublishRequestを作成する関数を作成します。

def makePublishRequest(schemaid):
  dt = datetime.now() + timedelta(days=5)
  payload = {
    "CreatedDate": int(datetime.now().timestamp()),
    "CreatedById": '005R...', #Your user ID
    "{replace event field}": "{replace field value}"
  }
  req = {
    "schema_id": schemaid,
    "payload": encode(schema, payload)
  }

  return [req]

パブリッシュコールを行い、受信した応答を処理します。

publishresponse = stub.Publish(pb2.PublishRequest(
  topic_name=mypubtopic, events= makePublishRequest(schemaid)), metadata=authmetadata)

これで、PlatformEventとPub/Sub APIの統合が完了しました。購読クライアントとパブリッシュクライアントをそれぞれ実行することで、イベントの発行と受信を確認できます。

参考リンク

このブログ記事がセールスフォースのPub/Sub APIの理解に役立つことを願っています。リアルタイムデータ処理の可能性を広げるために、ぜひこのAPIを試してみてください。

この記事が気に入ったらサポートをしてみませんか?