見出し画像

Snowflakeさわってみた。Snowpipe Streaming試してみた。

分析屋の下滝です。

Snowflakeをさわってみよう、の9回目です。

今回はSnowpipe Streamingを試してみました。Snowpipe Streaming とは以下のような機能です(DeepL翻訳)。

Snowpipe Streaming API(以下「API」)を呼び出すと、Snowflake Ingest SDKと独自のマネージドアプリケーションコードを使用して、ストリーミングデータ行を低レイテンシーでロードします。ストリーミングインジェストAPIは、ステージングされたファイルからデータを書き込むバルクデータロードやSnowpipeとは異なり、データの行をSnowflakeテーブルに書き込むものです。このアーキテクチャにより、ロードレイテンシが低くなり、それに伴って同量のデータをロードする際のコストも低くなるため、リアルタイムのデータストリームを処理するための強力なツールとなります。

https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview

Snowpipe Streamingは、
行単位でテーブルに書き込むよ、ファイル単位じゃないよ
という点が特徴でしょうか。

あまりわかっていませんが、とりあえず試してみました。詳しくは公式ドキュメントを参照してください。

現状では、JavaのAPI実装しかありません。

Java環境を作るのは数年ぶりなので、動かすのに苦労しました。

実装サンプルは、公式ドキュメントにでも紹介されている、ここにあるものを参考にしました。

まずは、Snowflake Ingest SDK が必要です。Mavenを使っていれば、設定できるかと思います(vs codeでここの構築がやったことがないので面倒でした)。

動かしてみた感じは、単にテーブルに順にデータをインサートするような印象でした。Snowpipe Streaming が、単なる Insertと異なる点は、チャネルやオフセットトークンという概念がある点かもしれません。

では、試していきます。

まず、設定ファイルとして profile.json を用意する必要があります。

{
    "url" : "https://kw71477.ap-northeast-1.aws.snowflakecomputing.com",
    "user" : "shimotaki",
    "private_key" : "MII….",
    "role" : "ACCOUNTADMIN"
}

URLは、いくつかの取得方法がありそうですが、Web画面からはここから取れます。

private_keyは、公式資料をそのまま実行すればいけました。

秘密キーを生成します。private_keyはこっちのものになります。

$ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

公開キーを生成します。

$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

Snowflakeユーザーに公開キーを割り当てます。

ALTER USER shimotaki SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';

続いて、実装をみていきます。サンプル実装を少し簡略したバージョンです。

処理の基本的な流れは以下となります。
・設定ファイルの読み込み(profile.json)
・チャネルの作成(流し込む先のテーブルを指定)
・チャネルへのインサート

Snowpipe Streamingにおけるチャネルの意味は以下となります。

APIは、1つまたは複数のチャネルを介して行を取り込みます。チャネルは、テーブルにデータをロードするための、Snowflakeへの論理的で名前付きのストリーミング接続を表します。1つのチャネルはSnowflakeの1つのテーブルに対応しますが、複数のチャネルが同じテーブルを指すことも可能です。Client SDKには、複数のテーブルに複数のチャネルを開く機能がありますが、SDKはアカウント間でチャネルを開くことができません。行の順序とそれに対応するオフセットトークンは、チャネル内で保持されますが、同じテーブルをポイントするチャネル間では保持されません。

https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#channels

今回は、以下のTEST テーブルが存在するとして、行をインサートしていきます。count というカラムが一つだけあります。

create or replace TABLE TEST.PUBLIC.TEST (
    COUNT NUMBER(38,0)
);

全体の実装は以下です。

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;

public class SnowflakeStreamingIngestExample {

  private static String PROFILE_PATH = "profile.json";
  private static final ObjectMapper mapper = new ObjectMapper();

  public static void main(String[] args) throws Exception {

    Properties props = new Properties();
    Iterator<Map.Entry<String, JsonNode>> propIt =
mapper.readTree(new String(Files.readAllBytes(Paths.get(PROFILE_PATH)))).fields();

    while (propIt.hasNext()) {
      Map.Entry<String, JsonNode> prop = propIt.next();
      props.put(prop.getKey(), prop.getValue().asText());
     }

    try (SnowflakeStreamingIngestClient client =
      SnowflakeStreamingIngestClientFactory.builder("MY_CLIENT").setProperties(props).build()) {

      OpenChannelRequest request1 =
      OpenChannelRequest.builder("MY_CHANNEL")
          .setDBName("TEST")
          .setSchemaName("PUBLIC")
          .setTableName("test")
          .setOnErrorOption(
              OpenChannelRequest.OnErrorOption.CONTINUE)
          .build();

      SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

      final int totalRowsInTable = 10;
      for (int val = 0; val < totalRowsInTable; val++) {

        Map<String, Object> row = new HashMap<>();

        row.put("number", val);

        InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
        if (response.hasErrors()) {
          throw response.getInsertErrors().get(0).getException();
        }
      }
      channel1.close().get();
    }
}

実行すると、TESTテーブルに10行のデータが格納されます。ループで0~9の値をそのまま入れているだけです。

もう一度実行すると、追記されます。

設定ファイルの読み込み箇所の説明は省略して、他のコードの箇所を少し見てみます。

以下の箇所でチャネルを作成しています。

      OpenChannelRequest request1 =
      OpenChannelRequest.builder("MY_CHANNEL")
          .setDBName("TEST")
          .setSchemaName("PUBLIC")
          .setTableName("test")
          .setOnErrorOption(
              OpenChannelRequest.OnErrorOption.CONTINUE)
          .build();

      SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

チャネル名:MY_CHANNEL
DB名:TEST
スキーマ名:PUBLIC
テーブル名:test
が主な設定です。

以下の箇所で、データをインサートしています。row.put("number", val); の箇所でカラムと値を指定しています。

      final int totalRowsInTable = 10;
      for (int val = 0; val < totalRowsInTable; val++) {

        Map<String, Object> row = new HashMap<>();

        row.put("number", val);

        InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
        if (response.hasErrors()) {
          throw response.getInsertErrors().get(0).getException();
        }
      }

今回は以上です。

株式会社分析屋について

ホームページはこちら。

noteでの会社紹介記事はこちら。

【データ分析で日本を豊かに】
分析屋はシステム分野・ライフサイエンス分野・マーケティング分野の知見を生かし、多種多様な分野の企業様のデータ分析のご支援をさせていただいております。 「あなたの問題解決をする」をモットーに、お客様の抱える課題にあわせた解析・分析手法を用いて、問題解決へのお手伝いをいたします!
【マーケティング】
マーケティング戦略上の目的に向けて、各種のデータ統合及び加工ならびにPDCAサイクル運用全般を支援や高度なデータ分析技術により複雑な課題解決に向けての分析サービスを提供いたします。
【システム】
アプリケーション開発やデータベース構築、WEBサイト構築、運用保守業務などお客様の問題やご要望に沿ってご支援いたします。
【ライフサイエンス】
機械学習や各種アルゴリズムなどの解析アルゴリズム開発サービスを提供いたします。過去には医療系のバイタルデータを扱った解析が主でしたが、今後はそれらで培った経験・技術を工業など他の分野の企業様の問題解決にも役立てていく方針です。
【SES】
SESサービスも行っております。