見出し画像

ServerからClientへの通知(SSE・Django・Redis・Vue.js)


こんにちは、Lisseテックリードの中嶋です。先日弊社の契約書レビュー支援サービスLeCHECKにServerからClientへの通知手段としてSSE(Server Sent Event)を導入しました。今回はその実装経緯と内容について紹介したいと思います。

開発経緯

LeCHECKはレビュー機能の他に、お客様の会社で管理している契約書のひな型をアップロード・解析・登録し、そのひな型とレビューしたい契約書を比較するできる機能を提供してます。今まで自社ひな型は1通づつしか登録できませんでしたが複数のひな型を登録できるように改修する要望があがりました。

ひな型の登録には早ければ一瞬、長ければ10分程度かかります。この登録処理を一括(最大100枚)で実施するためには、自社ひな型の登録を非同期で並列処理することが必須となります。さて、非同期処理で登録した場合どのようにしてClientに登録の完了を通知するのかという課題がでてきます。具体的には一覧画面に登録が完了した契約書を随時表示させる必要がありました。

ServerからClientへの通知手段

一般的にServerからClientへの通知手段には以下のパターンがあります

一番シンプルなPollingですがShortPollingとLongPollingがあり、後者の場合は一定期間メッセージを待ってから結果を返すことができます。実装コストを考えるとPollingが一番楽なのは明らかですが、今回は登録完了のイベントが不特定数かつ継続的に発生するため「どこでPollingを停止するのか」を測ろうとすると処理が煩雑になります。またある程度リアルタイム性をもった通知をしたかったので今回は見送りました。

もう1つの候補のWebSocketですが、リアルタイム通知・ServerからClientへの通知とやりたいことは全てカバーされますが、要件としては一方通行の通知ができればいいので、HTTP通信でより軽量に実装できそうなSSEを採用することとしました。

SSE接続数の課題

SSEの実装で一番注意しなくてはいけない点はブラウザが構築できる接続数の制限です。

Warning: When not used over HTTP/2, SSE suffers from a limitation to the maximum number of open connections, which can be specially painful when opening various tabs as the limit is per browser and set to a very low number (6). The issue has been marked as "Won't fix" in Chrome and Firefox. This limit is per browser + domain, so that means that you can open 6 SSE connections across all of the tabs to www.example1.com and another 6 SSE connections to www.example2.com. (from Stackoverflow). When using HTTP/2, the maximum number of simultaneous HTTP streams is negotiated between the server and the client (defaults to 100).

https://developer.mozilla.org/en-US/docs/Web/API/EventSource

MozillaのEventSourceに記載がある通り、SSEは同一ブラウザとドメインの接続数の制約があります。HTTP/1通信では極端に少なく、ChromeやEdgeの場合6接続までです。HTTP/2通信の場合は制限が100まで緩和されます。お客さんが複数タブでサービスを利用したり、今後SSEチャネルが増え接続が増えることを考えるとHTTP/2通信のもと実装するべきだと言えます。

ステートフルの課題

SSEを実装することでClientとServerは一対一で接続することになり「ステートフル」な状態になります。LeCHECKはサーバーはAWSのEKS Cluster上で稼働しており、常時複数のコンテナがスケールイン・アウトしているため、Clientが特定のサーバーに依存する「ステートフル」な状態を許容することはできません。この問題を解決するのがRedisです。

RedisにはMessageのSubscribe/PublishができるPub/Sub機能があります。RedisClusterを単一のMessageソースとして複数のサーバーがSubscribeしている状態であればMessageをそれぞれのサーバーで受け取ることができます。仮にサーバーの1つが終了されても、Clientがリトライして別のサーバーに接続できればよく、どのサーバーと接続しても同じ結果を受け取ることができます。

この構成ではすべてのサーバーがMessageをRedisにPublishすることができ、Subscribeしているすべてのサーバーで受信できるので、kubernetesのようなマイクロサービスでの汎用性が高いです。LeCHECKでも読み取り処理をしているサーバーとClientのInterfaceとなるサーバーは別なので以下の図のように複数のサーバー間でRedisを起点にメッセージのやりとりが可能になります。

サーバーの実装

StreamingHttpResponse

ではサーバーの実装を紹介します。LeCHECKでClientのInterfaceとなるサーバーはPythonのDjango Rest Frameworkを利用しています。DjangoやDRFにWebSocketやSSEを組み込む方法を調べるとDjango Channelsというライブラリの実装方法がよく紹介されています。SSE用にもConsumerClassの実装例があり試してみましたがRedisのPubSubと組み合わせるのにうまくいかないことが多くかなり苦戦しました。色々調べた結果、Channelsなど外部ライブラリを使わなくともDjangoのStreamingHttpResponseを利用して解決することができました。

StreamingHttpResponseはStreamEventを返すDjangoのレスポンスクラスでASGIアプリケーションサーバーでSSEやLongPollingを実装できます。実装方法は非常に簡単で以下のようにAsyncGeneratorを渡してあげるだけ。Generatorからyieldされるたびにイベントを送ることができます。

class EventStreamdView(APIView):

    permission_classes = [IsAuthenticated]
    renderer_classes = [EventStreamRenderer]

    def post(self, request):

        return StreamingHttpResponse(
            streaming_content=<AsyncGenerator>
        )

EventSourceなどClient側は基本的にtext/event-streamをAccept属性として設定してリクエストを送るためcontent-typeをtext/event-streamにしないとリクエストが失敗します。DRFではRenderClassをEventStream用に用意してあげ、SSEのエンドポイントではこのClassをRenderするようにさせます。

class EventStreamRenderer(renderers.BaseRenderer):

    media_type = "text/event-stream"
    format = None

    def render(self, data, accepted_media_type=None, renderer_context=None) -> str:
        return data

Redis Pub/Sub

このSSEエンドポイントで行うことはRedisのPub/Subの特定のチャネルに対するSubscriptionとMessageのListenです。以下はaioredisで実装した例です。

async def listen_channel(cls, channel_info: ChannelInfo) -> AsyncGenerator[str, None]:
    try:
        async with cls.async_client.pubsub() as pubsub:
            await pubsub.subscribe(channel_info.pubsub_channel_name)
            while True:
                message = await pubsub.get_message(
                    ignore_subscribe_messages=True, 
                    timeout=20.0)
                if message is None:
                    # messageがない場合heartbeatを送る
                    yield ": heartbeat\n\n"
                else:
                    yield message["data"]
                await asyncio.sleep(0.001)

    except asyncio.CancelledError:
        await pubsub.unsubscribe(channel_info.pubsub_channel_name)

特定のチャネルにSubscribeしたあとwhile TrueでMessageを待ち受けます。MessageがくればMessageを、タイムアウト期間を過ぎればheartbeatをyieldするAsyncGeneratorを作成します。SSEにおけるheartbeatはClientやMiddlewareがタイムアウトによる接続遮断しないように定期的に送る重要なMessageです。get_messageに任意のタイムアウトを設定し、Messageがなくても自動的にheartbeatもyieldされるようにします。

上の実装だけだとSSEのイベントとして送られる値はPub/SubのMessageとイコールになりますが、yieldされた値をキャッチして処理をしてからイベント内容を新たに生成することも可能です。"<string>\n\n"という形式さえ守ればJSON形式にすることもできます。

一番下でCatchしているCancelledErrorはDjango5で実装されたClientの接続遮断を検知するエラーです。接続遮断を検知したらUnsubscribeすることによりSubscribe数を抑えることができます。

チャネル・イベント管理

今回の実装でいうとSSEのチャネル、Pub/Subのチャネルが存在します。SSEのチャネルはエンドポイントURLです。例えばSSEのチャネル、エンドポイントURLがhttps://{domain}/event/{uuid}ならRedisのPub/Subチャネルもevent/{uuid}にして、SSEのチャネルと一対一の関係になるようにします。これにより実装がシンプルに直感的にわかりやすくなります。

またuuidの値がログインユーザーIDならそのユーザー特有のイベント処理ができますし、会社IDならその会社のイベント全体を受信することができます。このように複数のSSEチャネルを利用することがきでますが、1チャネルに1接続のため、ブラウザによる接続制限を考慮して利用は必要最低限にします。

さて、サーバー側の実装は以上です。Djangoの場合ASGIは必須、Django5系にできるとよいです。Channelsからなにやらあらゆるものを試しましたがStreamingHttpResponseでシンプルに実装することがきでました。

Django公式にも記載がありますが、Event-StreamはHeaderにContent-Lengthがないため古いproxyでは処理をうまく捌けない可能性もあるようです。またRedisのPub/Subに関しても基本的にMessageの再送は行われません(Redis Streamではあるよう)。再接続のわずかな合間にMessageが来てしまったら受け取れないClientも発生し得ます。データへの再アクセス経路があり、リアルタイム性をもった継続的な通知イベントを扱いたい場合に利用するなど考慮する必要があります。

Clientの実装

LeCHECKのFrontendはVue.jsやReactを利用したSinglePageApplicationです。今回通知処理を追加したVue.jsの実装方法をご紹介します。

Fetch

JavascriptのSSEの実装はEventSourseFetchなどが利用できます。EventSourceと比較してFetchは

  • POST requestができる

  • Request Headerを設定できる

というメリットがあります。Headerを設定できれば通常エンドポイントのリクエストと同じようにJWT認証等ができるため、商用利用時にあえてEventSourceを利用することはないでしょう。またMicrosoftが提供しているFetch互換のライブラリ、fetch-event-sourceを利用すれば接続が切れた時の再接続などが簡単に構築できるのでSSEを扱う時の第一選択になると思います。

もともとfetch-event-sourceを利用したかったのですが、実装当時そのままではWorker内で利用できなかったため通常のFetchを利用して以下のように実装しました。

return fetch(endpointUrl, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    Authorization: `JWT ${result.data.Token}`
  },
  signal: abortCtrl.signal
})
  .then(response => {
    const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
    const readStreamData = () => {
      reader
        .read()
        .then(({ done, value }) => {
          if (done) {
            return;
          }
          if (!value.includes('heartbeat')) {
            do_something(value)
          }
          readStreamData();
        })
    };
    readStreamData();
  })

headerに認証トークンをセットして接続、接続ができたらgetReaderでread Objectを取得しStream Dataを読み取り続けます。

SharedWorkerで接続数を抑える

前述したSSEの問題、ブラウザの接続数の制限について考慮した実装を行います。何も対策しない場合SSEの接続数は新規タブ・windowが開かれたタイミングなどで以下の図のようにどんどん増えていきます。HTTP/2で接続したとしても上限の100に到達しかねません。

対策としてSharedWorkerで接続を一元化、BroadCastChannelで複数のWindowやTabにMessageを伝播させる方法をとりました。

上の図の通り、SSEコネクションを構築するのはSharedWorkerのみになります。Messageが必要になるComponentはSharedWorkerで接続しているBroadCastChannelと同じChannelをListenすることによりSharedWorkerがSSEから受け取ったイベントをBroadCastChannelから受け取れることができます。流れとしては以下のようになります

  1. Component: mount時にSharedWorkerに接続

  2. Component: SharedWorkerにpostMessageを送る

  3. Component: BroadCastChannelに接続

  4. SharedWorker: Worker内でSSE接続を開始

  5. SharedWorker: イベントをListenする

  6. SharedWorker: イベントがきたらBroadCastChannelで通知

  7. Component: イベントを受信して処理を行う

今回はイベントをキャッチして一覧画面を更新させる目的のため、一覧画面を提供しているComponentに処理を実装しました。

async mounted() {
  await this.createUserTemplateSSE();
  await this.addSSEEventListener();
},
methods: {
  async createUserTemplateSSE() {
    const sharedWorker = new SharedWorker('shared.worker.js');
    sharedWorker.port.start();

    sharedWorker.port.postMessage({
      action: 'userTemplateSSE',
      url: endpointURL,
      refreshToken: refresh
    });
  },
  async addSSEEventListener() {
    // イベントを受信
    const UserTemplateBC = new BroadcastChannel('UserTemplateEvent');
    UserTemplateBC.addEventListener('message', message => {
      do_something()
    });
  },

さて、このままではmountされるたびにSharedWorerに接続するため、新規Windowやtabが開かれたりReload時にはSSE接続をなんども要求することになります。この問題を考慮してSharedWorker内では「常にSSE接続が1になるように」調整、つまりすでに接続がある場合はそれを遮断した上で再接続するようにします。

SharedWorkerのworker.jsの全容は以下のようになります。

// Global settings
const userTemplateBC = new BroadcastChannel('UserTemplateEvent');
const refreshURL = `${process.env.API_ENDPOINT}refresh/`;

// getIdToken function
async function getIdToken(refreshToken) {
  return fetch(refreshURL, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      refresh: refreshToken
    })
  });
}

class SSEConnectionManager {
  constructor() {
    this.activeConnections = {
      userTemplate: null,
      temporaryContract: null
    };
  }

  async connect(type, endpointUrl, refreshToken, broadCastChannel) {
    // Abort previous connection if exists
    this.abortConnection(type);

    // Create a new AbortController for the new connection
    const abortCtrl = new AbortController();

    // Create connection
    const connectionPromise = this.createConnection(endpointUrl, refreshToken, abortCtrl, broadCastChannel);

    // Store the connection details
    this.activeConnections[type] = {
      abortCtrl: abortCtrl,
      connectionPromise: connectionPromise
    };
  }

  abortConnection(type) {
    const connection = this.activeConnections[type];
    if (connection && connection.abortCtrl) {
      connection.abortCtrl.abort();
      console.log(`${type} connection aborted`);
    }
    this.activeConnections[type] = null;
  }

  async createConnection(endpointUrl, refreshToken, abortCtrl, broadCastChannel, retryDelay = 5000) {
    getIdToken(refreshToken)
      .then(async response => {
        if (response.status == 401) {
          throw new Error('Invalid RefreshToken');
        }
        const result = await response.json();
        return fetch(endpointUrl, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            Authorization: `JWT ${result.data.idToken}`
          },
          signal: abortCtrl.signal
        })
          .then(response => {
            const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
            const readStreamData = () => {
              reader
                .read()
                .then(({ done, value }) => {
                  if (done) {
                    console.log('Stream closed.');
                    return;
                  }
                  if (!value.includes('heartbeat')) {
                    broadCastChannel.postMessage(value);
                  }
                  readStreamData();
                })
                .catch(error => {
                  console.error('Stream reading error. Reconnecting...', error);
                  // if not Aborted, try to reconnect to the server
                  if (error.name !== 'AbortError') {
                    setTimeout(() => this.createConnection(endpointUrl, refreshToken, abortCtrl, broadCastChannel), retryDelay);
                  }
                });
            };
            readStreamData();
          })
          .catch(error => {
            console.error('Fetch error.', error);
            if (error.name !== 'AbortError') {
              setTimeout(() => this.createConnection(endpointUrl, refreshToken, abortCtrl, broadCastChannel), retryDelay);
            }
          });
      })
      .catch(error => {
        // NOTE 再コネクションのAbortとTokenの認証エラーの場合はRetryしない
        if (error.name !== 'AbortError' && error.message !== 'Invalid RefreshToken') {
          setTimeout(() => this.createConnection(endpointUrl, refreshToken, abortCtrl, broadCastChannel), retryDelay);
        }
      });
  }
}

const connectionManager = new SSEConnectionManager();

self.onconnect = e => {
  const port = e.ports[0];

  port.onmessage = event => {
      const url = event.data.url;
      const refreshToken = event.data.refreshToken;
      connectionManager.connect('userTemplate', url, refreshToken, userTemplateBC);
  };
  port.start();
};

上記の実装により「デバイスがSleep状態になり接続が切断されたとき」「サーバーの入れ替え・不具合で接続が切断されたとき」には再接続を試みます。「Reloadや複数タブによるComponent再描画時」にはAbortControllerによる切断を明示的に行い接続を入れ替えます。「Token期限切れによる認証エラーの時」にはエラーのまま再ログインをさせることになります。

この実装により目的であった不特定のタイミングで複数の契約書登録イベントをキャッチして一覧画面を更新するということが可能になりました。接続数の制限をしてSSEの制約に対応すること、サーバーアーキテクチュアの特性を考え再接続の処理をきちんと実装することが重要になります。

さて、基本的なClinetの実装は以上です。今回記載を省略しますが、SharedWorkerの設定に関してはWebpackを利用している場合はworker-loaderを利用できます。またSharedWorkerを使わないイベント処理(使いきりの通知が必要な場合)に関しては通常のjs内でSharedWorkerに記載したfetchによる接続・切断をすることで応用できます。

実装してみての感想

SSEはドキュメントも少なく上記の実装方法に至るまでかなり調査に苦戦しました。特にDjangoに関してはchannelを使うか、どうやってRedisとSSEを疎通させるかなど紆余曲折がありましたが、Dan Sloan氏のPyconの講演を聞きStreamingHttpResponseを利用する方法を採用しました(感謝です)。PollingでもできるWebSocketでもできる、という中であえてSSEを選択したわけですが、SSE特有の制約事項のためリリースするまでの不安は正直とても大きかったです。リリースから1ヶ月が経とうとしていますが、今のところ問題なく安定稼働しており、ひとまず安心しています。

この記事が少しでも役に立てば嬉しいです。お読みいただきありがとうございました!


一緒に働くメンバーを募集中!

株式会社リセでは、一緒に働くメンバーを募集しています。
カジュアル面談では、30分間お時間をいただき、会社・チーム・プロダクトのご紹介や色々なご質問にお答えします💁‍♂️
皆さんとお話しできることを楽しみにしています!

  • 株式会社リセで募集している開発ポジションはこちら↓

  • こちらからもお好きな日程でカジュアル面談を設定できます↓