見出し画像

Cloud Firestore トリガーのすすめElasticSearch インデックス同期編

こんにちは!エンジニアのオギーです。最近はパートナー様向けの新アプリの開発に没頭していました。こちらのアプリは今週無事リリースされました🎉 こちらについてはまた次回以降に触れようと思っています。

今回はCloud Firestoreトリガーのすすめということで、Elasticsearchのインデックスの同期システムに関することを紹介していきます。

Cloud Firestore トリガーとは

Cloud Firestore トリガーとは文字通り、Cloud Firestoreの変更をトリガーに独自のタスク(Cloud Function)を実行できる仕組みです。


こちらの仕組みによって例えば
・料理の各種データ(値段や画像等)が店舗側で更新されたら、Slack等に通知して内容を社内でチェックする
・料理が新規登録されたら、その店舗をお気に入りしているユーザーに対して新メニューに関する通知を送る
・FirestoreのデータをElasticsearchのインデックスに同期する

などの処理を非常に簡単に構築することができます。

今回構築したもの

弊社ではアプリホームのヘッドラインの生成や検索などにてElasticsearchを使用しており、Firestoreのデータを随時Elasticsearchの同期させる必要がありました。そのため今回は以下のような構成でシステムを構築しました。

画像1

CloudFirestoreトリガーを用いてドキュメントの変更を検知し、変更された「コレクション名」「ドキュメントID」「オペレーション(Create/Update or Delete)」をPubsubトピックにパブリッシュします。このトピックによってTaskを処理するサーバーに情報が渡され、対象ドキュメントのインデックスを更新をします。こうしてFirestoreのドキュメントの変更をリアルタイムでElasticsearchに同期することができました。これから各処理について説明していきます。

Firestoreの変更を検知しPub/Subでトピックにパブリッシュする

まずドキュメントの変更を検知した時にPub/Subトピックに対してその情報をパブリッシュするCloudFunctionを準備していきます。
CloudFirestoreトリガーのイベントの構造は、公式ドキュメントによると以下のような構造になっているようです。

{
   "oldValue": { // Update and Delete operations only
       A Document object containing a pre-operation document snapshot
   },
   "updateMask": { // Update operations only
       A DocumentMask object that lists changed fields.
   },
   "value": {
       // A Document object containing a post-operation document snapshot
   }
}

「oldValue」には変更前のドキュメントに関するデータ, 「value」には変更後のドキュメントに関するデータが格納されており、こちらのデータから変更されたコレクションやドキュメントIDの抽出を行うことができます。そのためCloudFunctionのコードは以下のように定義ができます。

type FirestoreEvent struct {
	OldValue FirestoreValue `json:"oldValue"`
	Value    FirestoreValue `json:"value"`
}

type FirestoreValue struct {
	CreateTime time.Time `json:"createTime"`
	Name       string    `json:"name"`
    // Fields     Fields    `json:"fields"` 対象ドキュメントのデータ
	UpdateTime time.Time `json:"updateTime"`
}

func PublishESIndexUpdated(ctx context.Context, e FirestoreEvent) error {
  // write your code
}

Valueには対象ドキュメントの更新時刻やパス、データが格納されておりこちらからデータの抽出等を行うことができます。前章で述べたとおりこちらの関数では「コレクション名」「ドキュメントID」「オペレーション(Create/Update or Delete)」の三つをトピックに通知させたいため、このOldValue/Valueの情報からそれぞれを抽出していきます。このValueには以下のようなパターンで値が設定されています。

・FirestoreValue.Nameには「projects/<Project名>/databases/(default)/documents/Shop/11Qz8Zl9gCatzo6jUgID」のようなパスが格納されている
・Create時は「OldValue」が空
・Update時は「OldValue」に変更前、「Value」に変更後のデータが格納されている
・Delete時は「Value」が空

これらのパターンからそれぞれの要素を抽出するには以下のようなコードで行うことができます。

type FirestoreEvent struct {
	OldValue FirestoreValue `json:"oldValue"`
	Value    FirestoreValue `json:"value"`
}

type FirestoreValue struct {
	CreateTime time.Time `json:"createTime"`
	Name       string    `json:"name"`
	UpdateTime time.Time `json:"updateTime"`
}

func (f *FirestoreEvent) ID() string {
	if f.Value.Name != "" {
		return extractIDFromPath(f.Value.Name)
	}
	return extractIDFromPath(f.OldValue.Name)
}

func (f *FirestoreEvent) Collection() string {
	if f.Value.Name != "" {
		return extractCollectionName(f.Value.Name, f.ID())
	}
	return extractCollectionName(f.OldValue.Name, f.ID())
}

func (f *FirestoreEvent) OperationType() OperationType {
	if extractIDFromPath(f.Value.Name) == "" {
		return OperationTypeDeleted
	}
	return OperationTypeCreatedOrUpdated
}

func extractIDFromPath(path string) string {
	if path == "" {
		return ""
	}
	fullPath := strings.Split(path, "/documents/")[1]
	pathParts := strings.Split(fullPath, "/")
	// sub collectionの場合(1階層の場合のみ有効)
	if len(pathParts) > 2 {
		return strings.Join(pathParts[3:4], "/")
	}
	return strings.Join(pathParts[1:2], "/")
}

func extractCollectionName(s, id string) string {
	ss := strings.Replace(s, id, "", 1)
	re := regexp.MustCompile(`/([A-Za-z]+?)/$`)
	return strings.Replace(re.FindStringSubmatch(ss)[0], "/", "", 2)
}

あとはPub/Subトピック用にjsonに整形しpublishすることで、簡単にFirestoreのドキュメントの変更をPub/Subを通してtaskのサーバーに伝達することができます。(Pub/Subの処理の説明については省きますが、サンプルコードをGithub Gistに貼っておきました。サンプルコード)

Cloud Functionの準備できたら以下のようなコマンドでデプロイすることで、対象コレクションの変更をトリガーに関数を実行されます。

$ gcloud functions deploy PublishESIndexUpdated --runtime go113 \
 --set-env-vars GCP_PROJECT=<GCPプロジェクト名> \
 --entry-point PublishESIndexUpdated \
 --region asia-northeast1 \
 --trigger-event providers/cloud.firestore/eventTypes/document.write \
 --trigger-resource "projects/<プロジェクト名>/databases/(default)/documents/<ドキュメント名>/{ID}" \
 --project <GCPプロジェクト名>

--trigger-resource にトリガーを設定したいコレクションのリファレンスを設定し、--trigger-eventにトリガーするタイミングを設定します。trigger-eventには三つのオプションがあるため用途に応じて使い分けていくと良さそうです。(公式ドキュメント)

Pub/Subによってプッシュされた内容からESインデックスを更新する

Firestore Trigger -> Cloud Function -> Pub/Sub -> AppEngine(task)という流れで更新すべきコレクションとIDを取得することができるので、あとは以下のような形でESインデックスを更新/削除するだけです。

type ESIndexUpdatedData struct {
	ID         string `json:"id"`
	Collection string `json:"collection"`
	Operator   string `json:"operator"`
}

func ServeHTTP(w http.ResponseWriter, r *http.Request) {
    ctx := context.Background()

    // Pub/Subからプッシュされた情報を取得する
    var res pubsub.PushRequest
    if err := json.NewDecoder(r.Body).Decode(&res); err != nil {
	    h.logger.Error("Error reading request body", zap.Error(err))
	    w.WriteHeader(http.StatusServiceUnavailable)
	    return
    }
    // ESインデックス更新/削除に必要な情報を取得する
    var d ESIndexUpdatedData
    if err := json.Unmarshal(res.Message.Data, &d); err != nil {
        h.logger.Error("Error reading request body", zap.Error(err))
	    w.WriteHeader(http.StatusServiceUnavailable)
	    return
    }
    switch d.Collection {
        // Orderコレクションの場合
        case "Order":
            switch d.Operator {
                case "created_or_updated":
                    // d.IDのドキュメントをFirestoreからgetしてES Indexをupdateする
                case "deleted":
                    // d.IDのドキュメントをESIndexから削除する
            }
        // Shopコレクションの場合
        // Case "Shop":
          // Orderと同様...
    }
}

まとめ

今回はCloud Firestoreトリガーのすすめシリーズ第一弾として、ESのIndex同期に関するシステムの紹介をしました。CloudFirestore Trigger x Cloud Function x Pub/Sub x AppEngine(task)を連結させることで、インデックスの構造が変わった時などもTaskのデプロイのみで対応することができます。

Cloud FunctionにESの更新/削除処理を書く方法もありますが、その場合はインデックスの構造が変わった時にTaskとCloudFunctionそれぞれデプロイする必要があり、忘れてしまうことがありそうです。そのため今回紹介したシステムを採用しました。

Cloud Firestoreトリガーはまだベータ版であるため、ドキュメントが更新されたときに必ず実行される保証はありません(お気をつけください..!)。なのでその前提でシステムを構築する必要があります。弊社ではESのデータが更新されていない場合を想定して、基本Firestoreからデータを取得し直し追加でバリデーション処理を適用する等の対策を行っています。

Chompyではらくとく便やストーリー(料理動画)の機能など、日々食に関する様々な新機能の開発、改善を行っています。ぜひご興味がある方は以下のリンクをチェックしてみてください!


この記事が気に入ったらサポートをしてみませんか?