見出し画像

BreadがDatabricksとDelta Lakeでレイクハウスを標準化した方法 -- How Bread Standardized on the Lakehouse With Databricks & Delta Lake

元記事です。

この記事は、Bread FinanceとDatabricksの共同投稿です。共著者のChris Taylor(Senior Data Engineer-Bread Finance)の貢献に感謝します。

ブレッド社(Bread)はAlliance Data Systemsの一部門(筆者注:2020年12月にBreadを買収)であり、テクノロジー主導の決済企業として、加盟店やパートナーと統合し、顧客のための決済オプションをパーソナライズしています。Breadのプラットフォームにより、加盟店はより多くの支払い方法を提供できるようになり、適切なタイミングで適切なオプションを提供することで、加盟店はコンバージョン率を向上させ、平均注文額を増やすことができます。Breadは現在、GameStop(カナダ)、SoulCycleEquinox(米国)をはじめとする400以上の加盟店にサービスを提供しており、今後も拡大が見込まれています。このプラットフォームは、財務報告、詐欺検出、信用リスク、損失見積もり、フルファネル・レコメンデーションエンジンなどのビッグデータのユースケースによって駆動されています。

アマゾン ウェブ サービス(AWS)クラウド上で稼働するBreadプラットフォームは、数十のマイクロサービスから構成されています。各マイクロサービスは、例えば、ユーザーやマーチャントの旅の一部を表現しています。例えば、顧客は支払い方法を選択したり、ローンを申し込んだりし、加盟店は取引のライフサイクルを管理したり、決済の詳細を追跡したりする。各マイクロサービスは、独自のPostgresデータベースに書き込み、データベースは設計上、互いに分離されています。このため、内部のビジネス分析と外部パートナーシップの報告のために、異なるサービスからのすべてのデータが初めて集まる一元化されたリポジトリが必要です。

これまでの実装方法

最初の取り込みはデータシンクのPythonモジュールで、毎晩すべてのデータベースとテーブルをCSVファイルとしてダンプし、そのファイルをSnowflakeウェアハウスのロースキーマにコピーして、既存のテーブルを毎晩上書きしていました。その後、dbt(Data Build Tool)とカスタム復号化モジュール(これもPythonコンテナとして実行)を使用してデータを変換し、レポート作成に対応できるようにしました。下図をご覧ください。

これまでの実装方法

課題(チャレンジ)

上記のインジェストワークフローでレポーティングが可能になったものの、いくつかの大きな課題がありました。最も緊急性の高いものはスケーラビリティ(拡張性)でした。私たちのPythonモジュールは、AWSクラウド上のAirflowクラスタ上でKubernetesPodOperatorによって実行されていました。それは、ポッドに割り当てられた計算リソース(〜1GB CPU、〜500MBメモリ、デフォルトの3倍)と、Airflowクラスタによってプロビジョニングされた全体的な追加能力で支えられていました。データ量がギガバイトからテラバイトに増えるにつれ、1つのデプロイメントでデータ同期ジョブを実行する時間も数分から数時間に増え、ポッドのリソースに負担をかけ、下流のデータ変換に遅延を生じさせていました。トランザクションやパートナーの数が増えても、ビジネスに合わせて拡張できる、より優れたソリューションが必要でした。

2つ目の課題は、スキーマの進化でした。マイクロサービスは進化を続けており、スキーマの変更は毎週のように発生する可能性があります。Snowflakeでテーブルを削除して再作成することで「自動的に」対応することはできますが、変更に関する知識も、下流のデータモデルを更新する時間もありませんでした。その結果、スキーマ変更時に変換ジョブがエラーになることがよくありました。そこで、スキーマの変更を警告し、より耐障害性の高いソリューションが必要になったのです。

最後の課題は、ベロシティ(速さ、俊敏性)です。チームと利用者の両方が成長するにつれ、タイムリーな取り込みがますます必要になってきました。レポート作成には1日目の更新で十分かもしれませんが、社内のBI機能(特にリスクと不正の分析)には、アプリケーションからの新鮮なデータが必要でした。私たちは、ほぼリアルタイムのデータを提供するソリューションを必要としていました。

提案(プロポーザル)

要約すると、私たちは以下を提供するプラットフォームを必要としていました。

  • kubernetes podsのメモリ制限に依存しないスケーラブルなコンピューティング

  • シンプルかつ安全にスキーマを進化させることができるストレージオプション

  • バッチ処理からストリーミング処理への移行を1行のコード変更で実現する能力

幸いなことに、Databricks上で動作するDelta Lakeは、上記のすべてを解決するソリューションとなりました。私たちは、1)素朴なデータダンプではなく正式な変更データ取得プロセス、2)取り込み用のPythonモジュールではなくApache SparkTM、3)計算用のSnowflakeではなくDatabricksを構築することを目標としました。また、Databricksに完全に移行するまでは、Snowflakeでデータモデルとユーザーのサポートを継続したいと考えました。

トランザクションエンリッチメントパイプラインのためのLakehouse

レイクハウスのビジョンは、この記事の最初の段落で説明したビジネスユースケースを実現することです。重要なのは、Delta Lakeに基盤を置き、データサイエンスと分析エンジニアリングに権限を与え、egress/ingressにコストをかけずに、データが存在する場所でジョブを実行し、データを分析できるようにすることです。さらに、ブレッドは常にデータの鮮度を最適化することを目指しているため、中核となる機能には、信頼性が高くスピーディーな取り込みを可能にする堅牢なエンジンが含まれていなければなりませんでした。

レイクハウスプラットフォーム

変更データ取り込みのためのDMSとAuto Loader

このブログに触発され、データベースのスナップショットと変更データの取得にAWS DMS (Database Migration Services)を選択しました。ソースはPostgresデータベース(RDS)でバックアップされたマイクロサービス、ターゲットはAmazon S3バケットのコレクションでした。そして、Auto LoaderでDMSのデータを取り込み、変更セットをDelta Lakeに継続的にアップサート(Upsert)しています。また、新たに利用できるようになったDatabricks SQL Connectorを使って外部ジョブのリファクタリングも行いました。以下のセクションでは、私たちの論理的根拠と実装について、より技術的に詳しく説明します。

DMSの構成

私たちのセットアップでは、各マイクロサービスに対して、対応するDMSタスクとS3バケットが存在します。移行は大きく3つのフェーズで構成されています。

  1. 既存データのスナップショット(フルロード)

  2. キャッシュされた変更点の適用

  3. 継続的なレプリケーション(CDC)

余分な接続属性はこのように設定しました。

上記の設定の場合、フルロードファイルは <microservice_bucket>/<schema_name>/<table_name>/LOAD*.parquet に書き込まれます。
CDCファイルは<microservice_bucket>/<schema_name>/<table_name>/yymmdd/*.parquetに書き込まれます。
追加の接続属性は、変更データを日付で分割し、変更が挿入、更新、または削除操作であるかを示す "I", "U", "D" のいずれかの値を持つ "Op" カラムを追加しています。

私たちにとって重要なカスタマイズは、DMSのターゲットとしてS3を使用する際の制限です。私たちのソーステーブルのカラムのいくつかは、ラージバイナリオブジェクト(LoB)を保存しています。S3をターゲットとして使用する場合、フルLoBモードはサポートされない。DMSタスクの設定でLob MaxSizeを指定する必要があり、DMS LoBカラムはSpark StringTypeとして表示されます。MaxLobSizeパラメータは、デフォルトでは32(kb)です。我々の計算によると、文字列の切り詰めを防ぐために値を増やす必要があります。

DMSレプリケーションでは、各文字をダブルバイト文字(DBCS)として扱います。したがって、カラム内で最も長い文字テキストの長さ(max_num_chars_text)を求め、2を乗じてLOBサイズの制限値を指定する。この場合、LOBサイズの上限はmax_num_chars_textに2を掛けたものになります。このデータには4バイト文字が含まれているので、再度2を掛けると17268 * 4 ~ 70 kbとなります。

スパークジョブ (Spark Job)

各マイクロサービスには、S3 DMSディレクトリを横断的に見て、すべてのテーブルを見つけ、Databricks Delta Lakeにデータをロードし、初期テーブルを作成するSnapshot Sparkジョブがあります。これに続いて、CDC ingestionスパークジョブが、すべてのテーブルを探し出し、各レコードの最新の状態を見つけ、変更されたデータを対応するDeltaテーブルにマージします。CDC ingestionを実行するたびに、スキーマの追跡も行い、現在のバージョンをS3に保存しています。

DMSの変更データを取り込む場合、ソーステーブルの主キーを特定することが重要です。私たちのほとんどのマイクロサービスでは、主キーは「ID」です。この命名規則を守っていないテーブルもありますし、複合プライマリキーを使っているテーブルもあります。したがって、マージするキーカラムは明示的に宣言するか、作成する必要がある。私たちは、複合主キーのカラムを連結しています。

注:マイクロサービスがどのように更新を行うかによって、例えば、レコードがその場で置き換えられる場合、挿入と更新が同時に行われることがあります。この場合、キーによる最新の変更を見つけるには、カスタムオーダーが必要になる場合があります。さらに、変更データが順番通りに到着しないこともあります。挿入や更新のあるファイルの前に、最終的な削除操作を含む DMS ファイルを受け取ることがあります。実際に削除されたデータの挿入を防ぐために、CDC タイムスタンプマーキングや「早期削除フラグ」の使用など、特別な処理が必要になる場合があります。

なぜAuto Loaderを使用するのか?

Databricks Auto Loaderは、クラウドストレージ上のファイルをDelta Lakeに自動的に取り込むことができます。Structured Streamingに組み込まれたブックキーピングとフォールトトレラントな動作を活用しながら、バッチ処理に近いコスト削減が可能になります。

コスト削減

なぜ従来の構造化ストリーミングジョブではないのか?ストリーミング・ジョブ・クラスターは24時間365日稼動しています。クラスタはスケールアップできるが、スケールダウンできない。テストでは、クラスタAPIを呼び出して、2時間ごとにクラスタを強制的にスケールダウンさせました。これに対して、run onceトリガーを使って任意の間隔(2時間ごと)でファイルを処理すると、ナイーブスケーラーを導入していても、計算コストは90%以上減少しました。

ストリーミングとバッチの比較

Auto Loaderの使用は、単にバッチジョブを実行するのとどう違うのでしょうか?S3から日次で分割されたファイルを読み込むバッチジョブはあります。バッチ処理のシナリオでは、S3センサーとreplace whereロジックを設定して、必要な時に再処理を行うようにしています。一方、Structured Streamingは、トリガーが成功するたびに、ジョブが作成したすべてのファイルをログにコミットします。失敗した場合、不正確なデータや重複したデータを削除するために別のプロセスを持つことなく、単純に中断した場所をピックアップすることができます。

通知モードとディレクトリ一覧モードの比較

DMSでは、変更データパーティションに多くの小さなファイル(通常、1日のパーティションに数百個)が出力されるのを見たことがあります。Auto Loaderの通知モードは、取り込み前に各Sparkジョブがファイルをリストアップする時間を短縮することができます。しかし、AWSの制限により、ファイル通知には明確なSLAがありません。S3に着地したファイルが、翌日まで発見されないことが確認されています。各営業日のトランザクションは、締切時間前にパートナーに報告する必要があるため、通知モードは私たちにとって信頼できるオプションではありません。

幸いなことに、Databricks 9.0以降では、ファイルのリストアップが大幅に最適化されました。この改善の詳細については、こちらをご覧ください。私たちのシナリオでは、DBR 8.4で実行した場合とは対照的に、各ジョブの実行は⅔の時間しかかかりません。また、8.4の通知モードを使用した場合と比較しても、その差はごくわずかです。データの鮮度を保証するために、パフォーマンスを犠牲にする必要はもうありません。

Databricks SQLコネクタを使用して、データサイエンティストのPIIを復号化する

Lakehouseに完全に移行するには、Snowflakeに接続されている外部システムで実行されているいくつかのジョブ、特にAmazon ECS上のPII復号化をリファクタリングする必要があります。変換のサブセットは復号化されたデータに依存しており、BI作業には欠かせません。移行リスクを最小限に抑え、ビジネス機能に支障をきたさないようにしなければなりません。

ECSクラスタは、復号化のための秘密鍵にアクセスできるように構成されています。鍵はマイクロサービスと共有され、Vaultに保管されます。ジョブはpandasのデータフレームをSnowflakeに書き込み、毎晩、既存のデータを置き換えます。それでも、以下の課題を解決する必要があります。

  1. 既存の ECS セットアップと秘密管理戦略をどのように維持するか?

  2. Apache Sparkを依存関係としてインストールすることなく、Delta Lakeに書き込むことは可能か?

Databricks SQL connectorのおかげで、ECSにdatabricks-sql-connector Python libraryを追加することができ、それによってpyodbc connection under the hoodを使用して、pandas dataframeをdelta lakeに書くシンプルなデータフローを可能にします。このコネクタの詳細については、こちらを参照してください。

Databricks SQL Connectorは新しくリリースされ、Databricks SQLまたはClustersへのリモート接続に適しています。

このコネクタは十分な柔軟性を備えているため、チャンク単位で復号してDelta Lakeにデータをアップサートすることができ、すべてのレコードを復号してSnowflakeのテーブル全体を置き換えるよりもパフォーマンスの向上につながりました。

Sparkコネクタと外部テーブルの比較

移行中のSnowflakeのレポート作業やユーザークエリーをサポートするために、Snowflakeの外部テーブルを使ったDelta Lake Integrationの利用をテストしました。最終的には、注目度の高い、時間的制約のあるレポート作業の前に、Sparkコネクタを使用してDeltaテーブルをSnowflakeにコピーすることを選択しました。以下は、外部テーブルから移行した主な理由です。

  • 頻繁なスキーマの変更:S3通知とキューシステムを使用して自動リフレッシュを構成しましたが、SnowflakeはDelta Lakeのように自動スキーママージやアップデートをサポートすることができません。CREATE OR REPLACEと外部テーブルの自動リフレッシュは互換性がなくなりました。

  • パフォーマンスへの懸念:外部テーブルは、Spark Connectorでデータをコピーするのに比べて、およそ20%遅くなることが判明しています。

  • パーティショニングされたテーブル、バキュームされたテーブル、最適化されたテーブルの一貫性がないビュー:外部テーブルを維持することは、Delta Lakeの最適化の妨げになりました。

  • ドキュメントとリファレンスの欠如:外部テーブルの構成は、複雑で実験的であるため、包括的で正確なドキュメントを見つけることが困難でした。

  • Snowflake内での機能性の損傷:外部テーブルの鮮度と有効性の問題を監査およびデバッグする機能が非常に制限される。

今後のデータサイエンスの方向性

DMS/CDCの取り込みとDatabricksのSQLコネクタを本番稼動させながら、我々はすべての生データをDelta Lakeに集中させ、企業情報のシングルソースを形成しています。そして、計算とクエリーをDatabricks SQLに移行し、ほぼリアルタイムのデータサイエンスと分析作業への道を開く、Lakehouseのビジョンを構築する準備が整いました。以下は、プラットフォームパイプラインの図です(実線が現在の状態、点線が将来の状態)。

Delta Live Tables + ラピッドプロトタイピングへの期待

現在のBI分析フローでは、データエンジニアがsparkジョブを書き、dbtモデルをデプロイする必要があります。ML開発を加速させるため、次世代クエリエンジンであるPhoton上で動作するDelta Live Tablesを検討しました。データエンジニアとアナリストが緊密に連携し、PythonとSQLを効果的に組み合わせて使用しました。特に、S3に保存されたローンデータの取り込み、外部データセット(消費者センチメントなど)の結合、データ品質の検証、ノートブック環境でのMLモデルの実験、そしてBIツールでの結果の可視化が迅速に行えたことに興奮しています。

以下は、S3ファイルからSlackbotが配信するLooker DashboardsまでのPipelineの図です。以下は、私たちが今後のデータサイエンス業務にDLTを使いたいと考えている主な理由です。

スピード
わずか数時間で、生データから実用的な洞察や予測に移行することができるのです。S3から継続的にデータをストリーミングし、検証のために期待値を組み込むこともできます。
民主化
アナリストやデータサイエンティストは、エンジニアリングの大規模なサポートなしで、エンドツーエンドのパイプラインで直接作業することができます。また、1つのパイプラインで共同作業や言語の混在も可能です。
ユニフィケーション
データロード、オーケストレーションから機械学習まで、デプロイの全段階が一カ所に存在します。パイプラインはその実行エンジンと同居しています。

まとめ

このブログでは、ブレッド社がDatabricks Delta Lakeを使用して、弾力性と拡張性のあるデータプラットフォームを構築していることを紹介しました。AWS DMSとDatabricks Auto Loaderジョブを使って、RDSデータソースの変更をインクリメンタルに取り込み、CDCデータをDelta Lakeに継続的にマージしています。また、ネイティブのDatabricks SQLコネクタを使用して、Databricksの外部のジョブを移行する方法も紹介しました。集中型データレイクの構築が完了したら、次のステップとしてPhoton SQL AnalyticsエンドポイントとDLT(Delta Live Table)パイプラインを活用し、よりシンプルな構成と少ないエンジニアリング依存で、ほぼリアルタイムのBIとML作業を可能にすることを目指します。


この記事が気に入ったらサポートをしてみませんか?