見出し画像

データレイクハウス:Apache Hudiを用いた次世代のデータレイクの構築

Data Lakehouse: Building the Next Generation of Data Lakes using Apache Hudi
作者:Ryan D'Souza & Brandon Stanley
日本語訳:Yugo Tahara

https://medium.com/slalom-build/data-lakehouse-building-the-next-generation-of-data-lakes-using-apache-hudi-41550f62f5f

データレイクハウスの基礎、利点、構築方法

概要

本記事では、データレイクハウスと呼ばれる、従来のデータレイクに対しさまざまな利点を持った新しいデータアーキテクチャを紹介します。記事内では、実際の顧客がデータ基盤にデータレイクハウスを組み込むことで、どのように拡張性、データの質、レイテンシの問題を解決できたかを紹介し、最終的には、Apache Hudiを用いてデータレイクハウスを構築するための基本知識を提供することが本記事の目的です。

前置き

ここ10年程のIoT、クラウドアプリケーション、ソーシャルメディア、機械学習の進化に伴い、企業が収集するデータ量は指数関数的に増加しています。同時に、データ収集のタイミングも、以前は日、時間単位だったものが分単位、秒単位へと、より高頻度に変化してきています。

ここ数年ほどは、それらの生データやエンリッチ化されたデータを保管する役割はデータレイクが担ってきました。しかし、それらのデータを高品質、最新、整合性が取れている状態に保つことには大変な労力がかかることに企業は気づき始めました。ただでさえ複雑な増分データの処理に加え、データレイクに入力するためにはそのデータのビジネス上での役割の理解や、相互に依存した数多くのバッチ処理に大きく依存する必要があるからです。
近年のデータレイクには以下のような課題があります:

  1. クエリベース変更データキャプチャ(以下CDC):増分データの抽出には、事前に定義しておいたフィルタ条件に基づいたクエリを使用することが最も一般的です。しかし、増分データに使用すべきフィールドがデータソース側のテーブルに存在しない時には、データソース側のデータベースに異常な負荷がかかるか、クエリ結果にすべてのデータベース変更が捉えられていない、などといった状況が発生します。さらに、データソースへクエリを行う中で、すでに削除されたレコードを検出するのは至難のため、クエリベースCDCでは削除されたレコードは含めることができません。この問題に対処できるのはログベースCDCであり、この方法については後述します。

  2. データレイクでの増分データ処理:データレイクを更新するETL処理では、データレイク内すべてのファイルを読み込み、変更を加え、新規ファイルとして全データセットを再書き込みする必要があります。これは、更新や削除が行われたであろう特定のファイルを更新する簡単な方法がないためです。

  3. ACIDトランザクション非対応:ACID特性に準拠していないため、同時に読み取りと書き込みが行われる状況では結果に不整合が生じます。

さらに、これらの課題は増加するデータ量と高頻度化するデータの更新の両側面から、より複雑になる傾向にあります。データエンジニアが直面しているこれらの課題に対し、Uber、Databricks、Netflixはそれぞれ解決策を打ち出しました。Apache Hudi (Uber)、Delta Lake (Databricks)、Apache Iceberg (Netflix)は、S3やHDFSなどの分散ファイルシステム上のデータレイクに挿入、更新、削除の処理を行うための増分データ処理フレームワークです。
 
Uber、Databricks、Netflixの努力は、拡張性、適応性、信頼性に優れた方法で最新のデータを取り出せる次世代のデータレイク、データレイクハウスとして実を結びました。

データレイクハウスとは?

データウェアハウスからデータレイクハウスへの変遷

端的にまとめると:データレイク + データウェアハウス = データレイクハウス

まずデータウェアハウスとは、BIツールなどでの使用を主眼に置いた、特定のユースケースやデータドメインのために変換・集約された履歴データを保管するためのプラットフォームです。一般的に、データウェアハウスは構造化データのみを保持し、コストパフォーマンスは悪く、バッチETLを用いてデータの入力が行われます。

一方、データレイクは上記の課題に部分的に対処すべく、構造化、半構造化、非構造化データを低価格ストレージに保管し、バッチ処理およびストリーム処理両方のパイプラインに対応しました。データウェアハウスと比較すると、データレイクには様々な形式の生データが保管され、これらは現在および将来のユースケースにおいて活用することが可能です。しかし、データレイクにも課題は残っており、トランザクションへの対応(データレイクが最新の状態を保つのは難しい)やACID特性への準拠(同時読み取り・書き込みができない)は未解決のままです。
 
データレイクハウスはデータレイクの特性である低価格ストレージ(S3、GCS、Azure Blob Storageなど)を取り入れつつ、データウェアハウスのデータ構造やデータ管理の特性を取り入れています。データレイクで課題となっていたACIDトランザクションに対応し、同時読み取り、書き込みにおけるデータの整合性も担保します。さらにデータレイクハウスでは、直接データレイクハウスに対してクエリできるため、従来のデータウェアハウスと比較して、低レイテンシであり、より迅速なデータの利用が可能となっています。

データレイクハウスには以下のような特徴があります:
What is a Data Lakehouse? 記事から引用)

  • ACIDトランザクションへの対応

  • スキーマの強制と統制

  • BIへの対応

  • ストレージとコンピューティングの分離

  • 様々なストレージフォーマットへの対応と、APIを使用できることによるオープン性

  • 非構造化データから構造化データまで、多様なデータの種類への対応

  • データサイエンス、分析など、多様なユースケースへの対応

  • エンドツーエンドでのストリーム処理への対応

そして、データレイクハウスを作成するには、Apache Hudiのような増分データ処理フレームワークが必要になります。

Apache Hudiとは?

Apache HudiはHadoop Upserts Delete Incrementalsの略称で、2016年にUberにより開発されたオープンソースフレームワークです。Apache Hudiはクラウドストレージ、HDFS、その他Hadoop互換ファイルシステムのストレージ等の分散ファイルシステム上にある、大規模なデータセットのストレージの管理を行います。ACIDトランザクション (Atomicity:原始性、Consistency:一貫性、Isolation:独立性、Durability:耐久性)がデータレイク上で可能になった形です。

Hudiのトランザクションモデルは、異なるインスタンス上のテーブルに対してとられたすべてのアクションを含むタイムラインをベースとしており、以下の特徴があります。(Apache Hudiから引用)

  • 高速でプラグイン可能なインデックスによる更新・挿入処理の実現

  • ロールバック、セーブポイントに対応したアトミック(不可分)な書き出し

  • 書き込みとクエリによる読み取り間でのスナップショットの独立

  • 統計に基づくファイルサイズとレイアウトの自動管理

  • 行、列データの非同期でのコンパクション

  • データ系統追跡のためのタイムラインのメタデータ

ユースケース

1. 変更データキャプチャ(CDC)を用いたターゲットデータの更新・挿入・削除

変更データキャプチャ(CDC)はソースのデータベースでの変更を特定し、キャプチャする処理のことを指します。その変更をソースデータベースから伝送先(この場合はデータレイクハウス)に複製します。これは変更のターゲットとなるテーブルに対する更新・挿入・削除の処理を捉える際に特に重要になります。
 
以下の表は最も主流な3つのCDCの手法です:

CDCの手法:利点と欠点

私たちの経験上、ソースデータベースからデータレイクハウスへのデータの複製にはログベースCDCを用いることが最善です。一番の理由としては、クエリベースCDCでは削除フラグを使用しない限り捉えられない削除されたレコードを、ログベースCDCであれば捉えることができるからです。
 
HudiはCDCを行うにあたり、既存のデータセットから該当するファイルを探し出し、変更を加えたうえで再書き込みを行うことができます。さらに、過去に遡って必要な日時のデータセットをクエリする機能もあり、これを用いて以前のバージョンに戻すことも可能です。
 
Oracle GoldenGate、Qlik Replicate(旧:Attunity Replicate)、DMSなどのCDCツールによる準リアルタイムでのデータの取り込みが普及する中、既存のデータセットに前述のような変更を加えられる機能は不可欠です。
 
2. プライバシー規制

GDPRに代表される昨今のプライバシー規制において、個人情報の削除は個人の正当な権利となりました。これに伴い、企業側はレコード単位での更新・削除を行う必要がありますが、Hudiのデータセットは削除に対応しているため、特定の個人情報や、特定の期間内のデータの更新や削除を格段に簡潔に行うことができます。

テーブルの種類:コピー・オン・ライト(書き込み時コピー) vs. マージ・オン・リード(読み取り時マージ)

コピー・オン・ライト:データは列指向ストレージの一種であるParquetファイルフォーマットとして保存されており、更新があるごとに新規バージョンのファイルが作成され、書き込まれます。この方法では、常にデータセットの最新バージョンが利用できるため、バッチ処理のように読み取り量が多いユースケースに適しています。
 
マージ・オン・リード:データは列指向ストレージのParquet、行指向ストレージのAvroの組み合わせとして保存されています。更新はコンパクションが行われるまで行指向の差分ファイルにログとして残り、列指向ファイルの新規バージョンが作成されます。この方法では、処理は差分ファイルとして書き込まれ、読み取り時にはParquetとAvroファイルのマージが必要なため、書き込み優先のユースケースに適しています。
 
ポイント:バッチETLジョブでのみ更新されるテーブルにおいては、コピー・オン・ライトを使用しましょう。更新がストリーム処理で行われるETLジョブにおいては、マージ・オン・リードを使用しましょう。詳しくはHudiドキュメント内の「How do I choose a storage type for my workload」をご覧ください。

クエリの種類:スナップショット vs. 増分 vs. 読み取り最適化

スナップショット:特定のコミット・コンパクション処理が行われた時点でのテーブルのスナップショットを返します。マージ・オン・リードを使用したテーブルの場合、スナップショットクエリでは元ファイルと差分ファイルのマージが行われるため、遅延は発生します。
 
増分:特定のコミット・コンパクションが行われた時点からのテーブルへの変更点を返します。
 
読み取り最適化:特定のコミット・コンパクション処理が行われた時点でのテーブルのスナップショットを返します。マージ・オン・リードを使用したテーブルの場合、読み取り最適化クエリの場合は元ファイル内のデータを、差分ファイルとマージせずに返します。

Hudiを使用した場合と、内製した場合を比較した時のHudiの利点:

  • 重複レコードや更新の漏れなど、従来の増分バッチ処理ETLパイプラインで見られるようなデータの質の問題に対処

  • リアルタイムパイプラインに対応

  • 異常検知、機械学習、リアルタイム予測提案などのユースケースに対応

  • 変更を追跡するためのタイムラインの仕組み

  • HiveとPrestoを用いたクエリにネイティブ対応

ここまではデータレイクハウスを実装するための増分データ処理フレームワークについて紹介してきましたが、ここからは実際の顧客がデータ基盤を強化する際に直面した課題に対する解決策について紹介します。

実際の課題

私たちは、データサイエンティスト達が最新のデータから、最小のレイテンシで、ほぼリアルタイムな予測を作り上げることでより良い洞察を得られるような、拡張性があり、コストパフォーマンスが良く、より良いデータ基盤を求めている顧客と働かせていただく機会がありました 。彼らのデータレイクを埋めるバッチETL処理は、データセットすべてを再処理する必要があったため、計算コストが大きく、遅延が発生している、という状況にありました。さらに、クエリベースCDCを使用していたため、ソース内でのすべての変更を捉えきれていない、という問題も発生していました。古く、不正確なデータを使用していたため、データ基盤に対する信頼も失われ、データから得られるはずだった直接的な価値も失われてしまっていました。

解決策

データの質の向上、ACIDトランザクションへの対応、低レイテンシ化を目指し、ログベースCDCツールのOracle GoldenGate、Apache Kafka、増分データ処理フレームワークのApache HudiをAWS上で動作させることで、データレイクハウスをAWS S3上に構築しました。

アーキテクチャ

目標のデータレイクハウスアーキテクチャ

使用した環境

Oracle GoldenGateはログベースCDCツールとして、ソースのシステムログからトランザクションなどのデータを抽出するために使用しました。この選定は、以前から顧客がOracle GoldenGate系のプロダクトを使用していたからでもあります。ログはKafkaにほぼリアルタイムで複製され、その後Kafkaからメッセージが読み取られ、Hudiフォーマットでデータレイクハウスにマージされました。
 
Apache Hudiを選定した理由は、AWS EMRとAthenaとの統合の利便性が良く、本解決策には理想的な候補だったからです。

実装ステップ

ステップ1:Oracle GoldenGateを用いたソースデータの複製
 
前述の通り、バッチ処理とストリーム処理両方に対応しているため、ログベースCDCは最良の選択肢となります。これにより、バッチ処理、ストリーム処理それぞれに別々のデータ取り込みパターンを用意する必要がなくなりました。従来は、バッチ処理ではSQL文を使い、任意の頻度で実行されるものでした。一方、ログベースCDCの場合は、変更があったタイミングでキャプチャし、伝送先(この場合はKafka)に送信されます。抽出と取り込みを切り離すことによって、データレイクハウスに求められる更新頻度に基づいた増分データの取り込みタイミングを決定できるようになります。これにより、定義した保存期間内にKafkaからデータを取得すれば良いので 、コストを抑えることが可能となりました。

 Oracle GoldenGateはソースシステム側でのトランザクションをキャプチャし、Kafkaトピックや他のデータベースなどのターゲットに複製する、データ複製ツールです。Oracle GoldenGateはデータベースのトランザクションログ、つまりデータベースで発生したすべての事象のログを使用します。Oracle GoldenGateはトランザクションを読み、指定されたターゲットに対しトランザクションを書き込みます。複数のリレーショナルデータベースに対応しており、Oracle、MySQL、DB2、SQL Server、Teradataなどに対応しています。
 
本解決策では、データ変更はソースのデータベースからKafkaへ、Oracle GoldenGateを用いて転送されます。この処理には3ステップあります:

  1. ソースデータベースのトレイルログ(証跡ログ)からOracle GoldenGate 12c(クラシック版)を用いてデータを抽出します。ソースデータベースにおけるトランザクションはリアルタイムに抽出され、中間ログ(トレイルログ)として保存されます。

  2. トレイルログをリモートの二次的トレイルログに書き込む:抽出されたトレイルログは、別のトレイルログ(Oracle GoldenGate for Big Data 12cインスタンスによって管理されているログ)に再度書き込まれます。

  3. Kafka Connectハンドラを使い、Oracle GoldenGate for Big Data 12c経由でKafkaへトレイルログを複製する:トランザクションはKafkaメッセージとして受信、複製されます。この処理において、トランザクションログから生成されたKafkaメッセージはスキーマレジストリの使用有無にかかわらずシリアライズされ、必要に応じて型変換を行ってからKafkaにパブリッシュされます。

Oracle GoldenGateでの複製、引用元:https://dzone.com/articles/creates-a-cdc-stream-from-oracle-database-to-kafka

注意点:デフォルトでは、Oracle GoldenGateで複製された際に、更新されたレコードには更新された列のみが含まれます。増分のレコードがデータレイクハウスへ最小限の変換(すべての列も複製した状態)でマージできるようにするには、サプリメンタル・ロギングを有効化しておく必要があります。こうすることで、各レコードの「ビフォア」と「アフター」、前後のイメージを含めることができます。
 
Oracle GoldenGateでの複製には、ソースのトレイルログ上でデータベースへの操作を示す”op_type”というフィールドが含まれており、Iは挿入(insert)、Uは更新(update)、Dは削除(delete)を指します。このフィールドを使い、データレイクハウス上でレコードの更新・挿入・削除を行うことができます。
 
以下のJSONは挿入レコードの例です:

ポイント:このOracle GoldenGateレコードはビフォアイメージにnull、アフターイメージにnullでない値が入っています。

更新レコードの例
ポイント:このOracle GoldenGateレコードはビフォアイメージにnullでない値、アフターイメージにもnullでない値が入っています。

削除レコードの例
ポイント:このOracle GoldenGateレコードはビフォアイメージにnullでない値、アフターイメージにはnullが入っています。

ステップ2:Kafka上で複製したデータをキャプチャする

Oracle GoldenGateの複製時のターゲットとなるのはKafkaです。Oracle GoldenGate for BigDataはKafka Connectハンドラ経由でKafkaへとレコードを複製するので、スキーマ進化や、スキーマレジストリで扱える他の機能にも対応できます。
 
ところで、なぜKafkaを使用するのか?
CDCツールとデータレイクハウスの中間層としてKafkaを使用することには2つの理由があります。
 
1つ目の理由は、Oracle GoldenGateはソースデータベースからApache Hudiフォーマットのデータレイクハウスへと、直接CDCデータを複製することはできないからです。これはApache HudiがSparkを基とした処理エンジンを使用していることと関係しています。KafkaとSpark構造化ストリーミングは接続できるので、最終的にHudiフォーマットへの変換と書き込みを行う前に、増分レコードを一時的に保持する層としてKafkaは適しています。
 
2つ目の理由はデータのユーザー向けにほぼリアルタイムでデータを提供するためです。これは一連のトランザクションに基づくサービスへのサブスクライバーの喪失を検知、回避することなどで対応しています。

ステップ3:Kafkaからデータを読み取り、S3にHudiフォーマットで書き込む

Spark構造化ストリーミングのジョブでは以下の処理を行います:

1. Kafkaからレコードの読み取り
Kafkaトピックからレコードを読み取るコードの例:

2. スキーマレジストリを用い、レコードをデシリアライズする
 
注意点:Confluent AvroフォーマットでシリアライズされたKafkaトピック内のデータは、Spark APIを使用して直接はデシリアライズできないため、データレイクハウスに書き込むためのデータのダウンストリーム処理ができなくなります。これはOracle GoldenGateを用いてレコードを複製した場合に発生していた問題でした。ABRiSは、スキーマレジストリのスキーマを使い、Confluent AvroフォーマットのKafkaレコードのデシリアライズを可能にするSpark用ライブラリです。本解決策では使用したABRiS のバージョンは3.2です。以下の動画の12:34でより詳しく解説されています:

Confluent AvroでシリアライズされたKafkaレコードをデシリアライズする例:

3. Oracle GoldenGateの”op_type”に従って必要なビフォア・アフターイメージを抽出し、データレイクハウスにHudiフォーマットで書き込む
 
SparkコードはOracle GoldenGateレコードの”op_type”フィールドを使用して、レコードを2つのグループに分離します:1つは挿入・更新が含まれるもの、もう1つは削除が含まれるものです。これらのグループに応じてHudiの書き込み処理は設定されます。次の変換では、目当てのビフォアイメージ、あるいはアフターイメージを抽出します。最終ステップでは、後述のHudiプロパティを設定した上、foreachBatch Spark構造化ストリーミングAPIをストリームモードあるいはバッチモードで使用し、S3内の任意の場所にHudiフォーマットでの更新・挿入・削除の書き込み処理を行います。

重要なHudiプロパティ

hoodie.datasource.write.precombine.field: precombineフィールドは必須設定であり、テーブル内でnullにする(レコードとして存在しない)設定にすることはできません。これはデータソース側に重複解決用のフィールドがない場合に問題となる場合があります。もしデータソース側がこの要件を満たさない場合、これらのテーブルに対して重複削除の処理を実装することが好ましいと思われます。
 
hoodie.datasource.write.keygenerator.class: 複合キーを持つ、あるいは複数列でパーティション化されているテーブルに関してはorg.apache.hudi.keygen.ComplexKeyGeneratorと設定してください。

非パーティションテーブルに関してはorg.apache.hudi.keygen.NonpartitionedKeyGeneratorと設定してください。
 
hoodie.datasource.hive_sync.partition_extractor_class: 複数列でパーティション化されたHiveテーブルを作成するにはorg.apache.hudi.hive.MultiPartKeysValueExtractorを設定してください。
 
非パーティションHiveテーブルを作成する場合はorg.apache.hudi.hive.NonPartitionedExtractorを設定してください。
 
hoodie.index.type: デフォルトではBLOOMと設定されており、その場合は一つのパーティション内での一意性のみが強制されます。すべてのパーティションに対して一意性を求める場合はGLOBAL_BLOOMに設定してください。Hudiは入ってくるすべてのレコードとデータセット内すべてのファイルを比較し、一つのパーティションにのみrecordKeyが存在していることをチェックします。大きいデータセットの場合は時間がかかります。
 
hoodie.bloom.index.update.partition.path: (GLOBAL_BLOOMインデックスを使用している場合)削除を行う場合はFalseに設定されていることを確認してください。
 
hoodie.datasource.hive_sync.use_jdbc: (もし必要であれば)Glue Data Catalogにテーブルを同期する場合はFalseに設定してください。
 
すべての設定に関してはApache Hudi configurations pageを、その他の情報についてはApache Hudi FAQ Pageをご覧ください。

Apache HudiをAWS Glueで使用する場合の注意点
Mavenに掲載されているhudi-spark-bundle_2.11-0.5.3.jarはそのままではAWS Glueでは動作しません。pom.xmlに変更を加えたカスタムのjarファイルを作成する必要があります。
 
1. pom.xmlをダウンロードして変更を加える
a) <includes>タグ内の以下一文を消す

b) <relocations>タグ内に以下のようにコードを記載する 

2. jarファイルをビルドする

mvn clean package -DskipTests -DskipITs

上記コマンドでビルドされたjar(コマンドを実行した”target/hudi-spark-bundle_2.11-0.5.3.jar”にあります)は、Glueジョブパラメータとして使用可能です。
 
この3つのステップが終了したところで、データレイクハウスは使用可能な状態になりました。前述のクエリ手法を用いて、Apache Hudi APIを介してS3バケットからデータを取得することができます。

おわりに

本解決策では従来のデータレイクで直面していた課題を解決することができました:

  1. データベース上のトランザクション、イベントを捉えるためにはログベースCDCの方が信頼性が高い

  2. Apache Hudiは、データレイクハウスに大規模に書き込みを行う際に必要となるインデックスと関連メタデータを管理することで、データレイクハウス内のターゲットデータを更新することができる

  3. ACIDトランザクションへの対応により、複数の読み取り・書き込みが同時に起こった場合もApache Hudi APIからの結果に不整合が生じることはない 

より多数の企業がデータ分析・機械学習の能力を伸ばしている昨今、一見裏方であるCDCツールとデータパイプラインも様々な課題に対処すべく、進化を続けていく必要があります。データレイクハウスは、拡張性、レイテンシ、データ消費者へ届けられるデータの品質、すべてにおいて改善された、次世代のデータ基盤です。このデータ基盤を土台とし、企業はデータからさらなる価値を生み出すことが可能になるでしょう。
 
この記事が、みなさんがApache Hudiを用いたデータレイクハウスを構築する際の礎になれば幸いです。みなさんの成功体験をお待ちしております!


この記事が気に入ったらサポートをしてみませんか?