UniFormを利用してSnowflakeからDatabricksのDeltaTableを読み込む

本記事は、 Databricks Advent Calendar 2024 シリーズ2の10日目の記事です。UniFormを利用してSnowflakeからクエリするまでの簡単な流れを書いています。

はじめに

noteで機械学習エンジニアをしている漆山です。
noteでは、データ基盤としてSnowflakeを利用しており、Snowflakeを利用してデータ分析をしています。
一方で、データ利活用の一つである機械学習に関しては、プラットフォームとしての利用しやすさやからSnowflakeではなくDatabricksを新たに採用しました。


ML で利用するデータの流れ

データ利活用の観点から分析のしやすさやガバナンスを考えると、データを一箇所にまとめておく方が望ましいです。そのため、データはSnowflakeに置いておくことが望ましく、Snowflakeのデータを利用する際にはLakehouse Federationを利用しています。現状ではMLエンジニアはSnowflakeとDatabricksの両方の使い方を知っておく必要があり、学習コストが高くなってしまいます。

MLエンジニアの成果物としては、MLを利用した推論結果の作成だったりMLそのもののEndpointであることが多いです。
DatabricksはSnowflakeよりもML Platformで使いやすく、MLエンジニアはDatabricksを使いこなすことで、高速に試行錯誤ができるようになってきています。

理想的には、MLエンジニアが仕事をしやすい状態を作りつつ、他の利用者がMLの推論結果を利用できることが望ましいです。
そこでUniForm形式の出番となります。UniFormはDatabricksが今年のData+AI summitで発表したDelta Tableの新しい形式です。
この形式で保存されたDelta TableであればIceberg Clientから読み込むことができるため、SnowflakeからSQLでクエリすることができます。


UniFormを活用したときのデータの流れ

準備

UniForm形式で永続化されたテーブルを読み込むためには、いくつかの権限設定などが必要です。ここでは、DatabricksとSnowflakeがAWS上で利用されている想定でどんな準備が必要かを記載します。

  1. SnowflakeのサービスプリンシパルをDatabricksに追加する

  2. Snowflakeにカタログ統合を追加する

  3. Snowflakeで利用するIAMロールを追加する

  4. Snowflake に外部ボリュームを作成する

1. SnowflakeのサービスプリンシパルをDatabricksに追加する

Databricksのアカウント管理画面で、 "ユーザー管理 > サービスプリンシパル"画面に移動して、Service Principalを作成します。
ここではSnowflakeというサービスプリンシパルを作成しています。

サービスプリンシパル画面

サービスプリンシパルを作成したら、Snowflakeから参照したいDelta Tableに参照権限を付与します。 「カタログ > 参照したいカタログ > 権限 > 付与」から権限が付与できます。

権限が付与できたら、次にDatabricks上にOAuthシークレットを作成します。
サービスプリンシパルの管理画面から、「シークレットを生成」をクリックする。
OAuthシークレットは一度しか表示されないので注意。

OAuthの作成

Snowflakeにカタログ統合を作成する

UniFormとして保存しているDelta TableはIceberg RESTとして読み取れるようになります。SnowflakeからUniForm形式のDelta Tableを参照するために、カタログ統合を構成します(Snowflakeのアカウント管理者の権限が必要)。

CREATE OR REPLACE CATALOG INTEGRATION <catalog-integration-name>
  CATALOG_SOURCE = ICEBERG_REST
  TABLE_FORMAT = ICEBERG
  CATALOG_NAMESPACE = 'default'
  REST_CONFIG = (
    CATALOG_URI = '<workspace-url>/api/2.1/unity-catalog/iceberg',
    WAREHOUSE = '<catalog-name>'
  )
  REST_AUTHENTICATION = (
    TYPE = OAUTH
    OAUTH_TOKEN_URI = '<workspace-url>/oidc/v1/token'
    OAUTH_CLIENT_ID = '<client-id>'  -- 先ほど作成したOAuth token
    OAUTH_CLIENT_SECRET = '<secret>' -- 先ほど作成したOAuth secret
    OAUTH_ALLOWED_SCOPES = ('all-apis', 'sql')
  )
  ENABLED = TRUE;

<workspace-url>はDatabricksのワークスペースのURLです。
<catalog-name>はDeltaTableのカタログの名前です。

Snowflakeで利用するIAMロールを作成する

Snowflakeが利用しているIAMロールに、Unity Catalogが利用しているS3バケットのARNを追加します。
S3のバケット名はUnity Catalogの「詳細」に表示されています。
詳細はSnowflakeのセキュアアクセスの構成を参照してください


Snowflakeの外部ボリュームを作成する

Delta Tableの読み込みにはSnowflakeの外部ボリュームが必要です。そのため、外部ボリュームを作成します
(カタログ統合と同様にSnowflakeのアカウント管理権限が必要)

CREATE OR REPLACE EXTERNAL VOLUME <external-volume-name>
STORAGE_LOCATIONS = (
    (
    NAME = '<ストレージロケーションの名前。識別できればなんでも良い>'
    STORAGE_PROVIDER = 'S3'
    STORAGE_AWS_ROLE_ARN = '<Snowflakeの IAM Role のARN>'
    STORAGE_BASE_URL = '<Unity Catalog 詳細にある「ストレージの場所」>'
    STORAGE_AWS_EXTERNAL_ID = '<aws_iam_policy_document.assume_snowflake_storage_integration に書いてある External ID>'
    )
);

UniForm形式のDelta Tableの作成

ここでは、Databricksからサンプルとして読み込めるニューヨーク市のタクシー移動データを利用します。
大まかな流れとしては以下の通りです。
1. SparkでDataFrameを読み込む
2. UniForm形式としてDeltaTableで書き込む

df = spark.read.table("samples.nyctaxi.trips")
df.write \
    .format("delta") \
    .option("delta.enableIcebergCompatV2", "true") \
    .option("delta.universalFormat.enabledFormats", "iceberg") \
    .saveAsTable(table_path)

気を付けるポイントとしては、optionを2つ指定することです。
1. option("delta.enableIcebergCompatV2", "true") 
2. option("delta.universalFormat.enabledFormats", "iceberg") 

2. UniForm形式で保存されたDeltaTableを読み込む

2024/12/04時点で、UniForm形式のDeltaTableの更新をSnowflake側では自動で検知してくれません。そのため、DeltaTableの更新後、テーブルのリフレッシュが必要です。

CREATE OR REPLACE ICEBERG TABLE IF NOT EXISTS nyctaxi_trips
EXTERNAL_VOLUME = '<外部ボリュームの名前>'
CATALOG = '<カタログ統合の名前>'
CATALOG_TABLE_NAME = 'nyctaxi_trips'
CATALOG_NAMESPACE = '<カタログのネームスペース>';

ALTER ICEBERG TABLE nyctaxi_trips REFRESH; -- ICEBERG TABLEの更新

上記のクエリではテーブルがなかった場合にはテーブルを作成して、その後テーブルをリフレッシュしています。
Delta Tableを更新した際には ALTER ICEBERG TABLE <table_name> REFRESHを実行してください。

ここまで実行がうまくいけばSnowflakeからUniForm形式のDelta Tableが読み込めるようになります。

SnowflakeからUniForm形式のDelta Tableをクエリする

終わりに

この記事では、UniForm形式で保存されたDeltaTableをSnowflakeからクエリするまでの手順を書きました。UniFormが発表されたばかりということもあり、この記事が他の人の役に立つことを願っています。

この記事を書くにあたって、さまざまな検証や協力をしてくれたchovさんとデータインフラチームにこの場を借りてお礼申し上げます。


いいなと思ったら応援しよう!