見出し画像

Snowflakeに手軽にSQLを打てるJupyterマジックコマンドを作ってみた

はじめに

こんにちは、ライフイズテックサービス開発部データ基盤グループのホンディーです。今日はSnowflakeを便利に使う小技を紹介します。

データ分析の仕事をしていると、SQLでデータを取得しその後の分析や機械学習モデルの構築などをPythonで行うって場面は多いと思います。

DWHとしてSnowflakeを導入している場合、Python用のコネクタが用意されているのでこれを使うことになりますが、そのコードの記述量は結構多くて面倒に感じることがありませんか?connectメソッドにユーザーID、パスワード、アカウントIDを渡して接続し、カーソルを取得してexecuteメソッドでクエリを発行し、fetchallで結果を取得して、pandasのDataFrameに変換し、不要になった接続は閉じる、という手順を踏む必要があります。

これを、「JupyterのセルにSQLを書いたら動く」くらい手軽にしたいというのが今回の記事のモチベーションです。

Jupyterのマジックコマンドは自作できる

正確にはIPythonの機能なのですが、Jupyterにはマジックコマンドという仕組みがあり、%{コマンド名}や、%%{コマンド名}という記法で便利な機能を利用できます。そして、このマジックコマンドは自作できます。
これを使って、SnowflakeにSQLを発行しデータを取得するマジックコマンドを作っちゃいましょう。

作成したマジックコマンド

コードをそのまま書いてしまいますが、次のテキストをコピーして、例えば、lit_snowflake.py というファイル名で保存し、環境変数PYTHONPATHで指定したディレクトリに配置してください。途中コメントアウトしてある、デフォルトのDatabase名は一番良く使うDB名を入れておくのがおすすめです。

環境変数のPYTHONPATHを設定してない場合は、このファイルを使いたいnotebookファイルと同じディレクトリに置いて使うこともできます。

import argparse
import os
import pandas as pd
from snowflake import connector
from IPython import get_ipython
from IPython.core.magic import Magics, magics_class, cell_magic


@magics_class
class SnowflakeMagic(Magics):

    def __init__(self, shell):

        super().__init__(shell)

        # 接続情報を環境変数から読み込み。
        self.con_args = {
            "user": os.environ.get("SNOWFLAKE_USER"),
            "password": os.environ.get("SNOWFLAKE_PASSWORD"),
            "account": os.environ.get("SNOWFLAKE_ACCOUNT"),
        }

    @cell_magic
    def snowflake(self, line, cell, local_ns=None):

        parser = argparse.ArgumentParser()
        parser.add_argument(
            "database",
            nargs="?",
            # default="{DB名省略時に指定されるDB名}"
        )
        parser.add_argument("-o", "--out")
        args = parser.parse_args(line.split())

        ip = get_ipython()
        # クエリ内の変数を値に変換し、実際に発行するSQLを構築。
        query = cell.format(**ip.user_ns)

        # Snowflakeに接続
        self.con_args["database"] = args.database
        self.connection = connector.connect(**self.con_args)

        try:
            # SQLの発行と結果取得
            with self.connection.cursor(connector.DictCursor) as cursor:
                cursor.execute(query)
                rows = cursor.fetchall()
                if args.out:
                    # 結果をデータフレームに変換
                    df = pd.DataFrame(rows)
                    # -o 引数で指定した変数に格納
                    ip.push({args.out: df})
                # クエリIDと結果行数を表示しておくと便利なのでオススメ。
                print(f"結果行数: {cursor.rowcount}, クエリID: {cursor.sfqid}")

        except Exception as e:
            # エラーの表示。
            print(e)
        finally:
            # 接続を閉じる
            if hasattr(self, 'connection'):
                self.connection.close()


def load_ipython_extension(ipython):
    ipython.register_magics(SnowflakeMagic)


接続に必要な認証情報は環境変数から取得していますので、以下の3変数に各自の認証情報を設定してください。

  • SNOWFLAKE_USER

  • SNOWFLAKE_PASSWORD

  • SNOWFLAKE_ACCOUNT

PYTHONPATHもあるので設定する環境変数は全部で4つですね。Macを使われている場合は、~/.zshrc ファイルで指定しておきましょう。

使い方

準備が整ったら使ってみましょう。Jupyterで新しいnotebookを開いたら、次のコマンドを打って拡張機能を読み込みます。

%load_ext lit_snowflake

すると、先程作成したマジックコマンドが読み込まれるので、次のようにしてSQLを実行できます。(DB名、テーブル名、カラム名はすべて架空のものです。)

%%snowflake dummy_database -o df
SELECT
    id,
    name,
    dummy_column1,
    dummy_column2,
    dummy_column3,
    created_at,
    updated_at
FROM
    dummy_table

これで、 -o 引数で指定した名前の変数(上の例ではdf)にSQLで抽出してきた結果のDataFrameが格納されます。

SQLを実行する前に、Python文字列のformatメソッドで変数の置換を行っているので、すでに宣言されている変数であればSQL中に埋め込むことも可能です。たとえば、todayとyesterday って変数に次のように文字列が入っていたとしましょう。

today = "2023-10-24"
yesterday = "2023-10-23"

この場合、次のようなSQLを実行できます。

%%snowflake dummy_database -o df
SELECT
    *
FROM
    dummy_table
WHERE
    created_at >= '{yesterday}'
AND
    created_at < '{today}'

このセルを実行すると、Snowflakeには以下のクエリが発行されます。

SELECT
    *
FROM
    dummy_table
WHERE
    created_at >= '2023-10-23'
AND
    created_at < '2023-10-24'

マジックコマンドなので、最初の1行のおまじないは必要ですが、JupyterのセルにそのままSQLを書けるのは非常に便利です。

カスタマイズはご自由に

マジックコマンドを自作するメリットとしては、カスタマイズが自由にできる点が挙げられます。例えば、以下のような処理を入れておくと便利です。
上のコードは記事用にかなりシンプルにして紹介しましたが、僕が実際に使っているモジュールにも入れています。

  • SQL発行前にdatabase名に誤りがないか確認する。

  • SQLの発行履歴やレコード数などを別ファイルにロギングする。

  • Snowflakeの仕様で大文字で戻ってくる列名を小文字に変換する。

  • クエリの実行時間も計測する。

  • 自動で読み込むように設定して最初の%load_extを不要にする。

また、テーブル名の一覧や、指定したテーブルの列名の一覧取得など、頻繁に発行するクエリは同じ要領で専用のマジックコマンドを作っておくと便利です。同じクラスにメソッドを追加すれば作れます。

このテクニックの元ネタの紹介

DBやDWHをマジックコマンド操作できると便利だというのは、僕のオリジナルのアイデアではなく、過去に使っていたトレジャーデータというDWHのライブラリに実装されていた機能です。
参考: pandas-td Compatibility - Product Documentation - Treasure Data Product Documentation

SQL中の{}で囲った文字列がPythonコード中の変数に置換されることや、-o 引数で指定した変数に結果を格納するといった挙動はそのまま真似させていただきました。

まとめ

このマジックコマンドを使うと、一つのnotebookの中でデータの取得から分析までをコンパクトに行う事ができます。
SnowflakeでCSVダウンロードしてPythonで読み込み直すとか、Snowflake接続のためのコードを書くといった作業から解放されると本当に快適なのでぜひ試してみてください。


おしらせ

ライフイズテック サービス開発部では、気軽にご参加いただけるカジュアルなイベントを実施しています。開催予定のイベントは、 connpass のグループからご確認ください。興味のあるイベントがあったらぜひ参加登録をお願いいたします。皆さんのご参加をお待ちしています!

いいなと思ったら応援しよう!