見出し画像

データマートをセルフサービス化したら1石4鳥だった話

こんにちは、noteで数少ないデータエンジニアの久保田です。
この記事はnote株式会社 Advent Calendar 2022の24日目の記事です。

さて、早速ですが皆さん、データマート作っていますか?
データマートの作成は結構大変なことが多いので、ここ最近の弊社の取り込みを紹介させてください。


データマート

データマートとは、使用用途に合わせた集計済みのテーブル群とここでは定義します。

弊社では、note内でのユーザーの行動ログやユーザーが書いた記事など、多様なデータが色々なデータソースにあります。
それらをデータウェアハウスである Snowflake に入れて、Snowflakeのみで分析やアプリケーション、機械学習に使えるように整備しています。
しかし、このままでは欲しいデータがどこにあるのかわからない、という状態になってしまっています。

データウェアハウスは倉庫で、データマートは商店、というイメージだとわかりやすいかと思います。

現実世界でも、お客さんが直接倉庫に商品を買い付けに行くのではなく、商店がそれぞれ自分のお客さんが求めるものを倉庫から買い付け、お客さんが見つけやすいように陳列しますね。

それは、大量に色々なものを保管している倉庫より、欲しいものが探しやすいように陳列されている商店の方がお客さんにとってほしいものを手に入れるのに便利だからです。

データを使ってもらう際、「直感的に、すぐに欲しい情報にアクセスできること」が非エンジニアや普段データ基盤のデータを使っていない方にとっては特に重要と考えています。よって、わかりやすくデータを陳列したデータマートの提供は必要だなと考え、以前から運用をしていました。

課題

弊社の分析のフローは、会社の様々なデータ分析を、データサイエンティストが在籍するデータチームが依頼を受けて消化していっています。
その際に、必要な中間データをエンジニアが在籍するデータ基盤チームが作りデータマートを構築しています。

※イメージです

しかし、弊社のデータマート環境は、いくつかの課題を抱えていました。

  • 依頼されてから中間テーブルを提供するまでに時間がかかる

  • 時間がかかるので要望が消化しきれず、数が少ない

  • ほしいデータがどこのテーブルにあるのかわからない

  • 認識のずれ、バグなどで集計にズレが生じる

まずは

  • 依頼されてから中間テーブルを提供するまでに時間がかかる

  • 時間がかかるので要望が消化しきれない

この課題を解決しようと考え、データマートの作成を誰でもできるようにするデータマートのセルフサービス化を検討しました。
すると、少しの工夫で他の課題も同時にいい感じに解決することができました。


データマートの作成をセルフサービス化する

データマートの生成方法には、大きく分けて

  •  GlueなどでSparkを使い、プログラミングで新しいデータを生成する

  • SQLの結果を新しいテーブルとして保存し生成する

の2パターンがあります。

SQLだけでは解決できないような課題に対しては、1つ目の手法を使うしかないので非エンジニアであるデータサイエンティストにやらせることは難しいですが、2つ目はそうではありません。
SQLとテーブル定義だけ書いてもらい、その結果をテーブルに入れる環境さえ提供すれば、誰でもデータマートが作成できるということになります。

そこで目をつけたのは、Snowflakeが提供しているSnowparkです。

Snowparkは、Snowflake上でpythonなどのプログラムを実行することを可能にするAPIを提供してくれます。
つまり、Snowflakeにあるデータをサーバーなどに移動しなくても、データを扱うプログラムをSnowflake上で実行することができるようになります。

プログラム経由でSnowflakeを扱うことができるので、柔軟なワークフローの作成が可能になると考え、以下のような構造にすればデータ基盤チームで開発をせずとも中間テーブルを簡単に量産できると考えました。

依頼者に定義ファイルとSQLファイルを作ってもらい、データ基盤チームで実行をする

一つずつ処理を見ています。
テーブルを作る処理ですが、pythonのスクリプトからテーブル定義が書かれているjsonファイルを読み、実行する形です。


definition.json

こちらは定義ファイルです。テーブルやカラム定義を書いてあります。

[
  { 
        "name": "user_pv",
    "schema": "schema",
    "mode": "overwrite",
    "columns": [
       {
         "name": "user_id",
         "type": "integer"
       },
       {
         "name": "pv",
         "type": "integer"
       }
    ]
  },
  { 
        "name": "daily_pv",
    "schema": "schema",
    "mode": "append",
    "columns": [
       {
         "name": "date",
         "type": "date"
       },
       {
         "name": "pv",
         "type": "integer"
       },
       {
         "name": "uu_pv",
         "type": "integer"
       }
    ]
  }
]


create_table.py
そして、こちらのpythonスクリプトで上記の定義ファイルをみて、create table文を発行しています。

import datetime
import json
import os
import os.path
import sys

import snowflake
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc

connection_parameters = {
    "account": "[YOUR ACCOUNT]",
    "region": "[YOUR REGION]",
    "user": "[USER]",
    "password": "[PASSWORD]",
    "role": "[ROLE]",
    "warehouse": "[WAREHOUSE]",
    "database": "[DATABASE]",
    "schema": "[SCHEMA]"
}

session = Session.builder.configs(connection_parameters).create()

def create_tables(session: snowflake.snowpark.Session) -> str:

    r = ''
    with open('definitions.json', 'r', encoding='utf-8') as file:
        r = file.read()

    targets = json.loads(r)
    for target in targets:
        columns = []
        for c in target["columns"]:
            column_name = c["name"]
            t = c["type"]
            columns.append("{0} {1}".format(column_name, t))

        session.sql("""
        create table {0}.{1} if not exists (
          {2}
        );
        """.format(target["schema"], target["name"], ",".join(columns))).collect()

    return ""


create_tables(session)
session.close()

こんな感じでテーブルが存在していなかった場合、テーブルの作成をしています。

次に、データを作る処理です。


queries以下に、定義ファイルのtable名と同じ名前でSQLファイルを作ります。

user_pv.sql

select
user_id, count(1) as pv
from logs join users.id = logs.user_id
group by user_id

daily_pv.sql

select
date, count(1) as pv, count(distinct user_id) as uu_pv
from logs
group by logs.created_date

create_data_from_sql.py
こちらのスクリプトで、定義ファイルとSQLファイルを読み、SQLを実行してデータを作ります。

import datetime
import json
import os
import sys

import snowflake
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc

connection_parameters = {
    "account": "[YOUR ACCOUNT]",
    "region": "[YOUR REGION]",
    "user": "[USER]",
    "password": "[PASSWORD]",
    "role": "[ROLE]",
    "warehouse": "[WAREHOUSE]",
    "database": "[DATABASE]",
    "schema": "[SCHEMA]"
}

session = Session.builder.configs(connection_parameters).create()

session.add_import("definitions.json")

r = ''
with open('definitions.json', 'r') as file:
    r = file.read()

targets = json.loads(r)

for target in targets:
    session.add_import("queries/{}.sql".format(target["name"].lower()))


@sproc(name="create_datamart_from_sql", is_permanent=True, stage_location="[STAGE]", replace=True, packages=["snowflake-snowpark-python"])
def create_datamart_from_sql(session: snowflake.snowpark.Session) -> str:
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    r = ''
    with open(import_dir + 'definitions.json', 'r', encoding='utf-8') as file:
        r = file.read()

    targets = json.loads(r)

    for target in targets:
        if target["mode"] == "overwrite":
            session.table(target["name"]).delete()

        sql = ''
        with open(query_dir + '{}.sql'.format(target["name"].lower()), 'r') as file:
            sql = file.read()

        exec_sql = ""
        if target["mode"] == "overwrite":
            exec_sql = """
            insert into {0}.{1} (
                {2}
            );
            """.format(target["schema"], target["name"], sql)
        else:
            column_names = []
            for column in target["columns"]:
                column_names.append(column["name"])

            updates = []
            inserts = []
            for column_name in column_names:
                updates.append("{1}.{2} = b.{2}".format(
                    target["schema"], target["name"], column_name)
                )
                inserts.append("b.{0}".format(column_name))

            # merge intoを作る
            exec_sql = """
                merge into {0}.{1} USING (
                    {2}
                ) as b on {0}.{1}.{3} = b.{3}
                when matched then update set 
                {4}
                when not matched then insert values ({5});
            """.format(target["schema"], target["name"], sql, target["key"], ",".join(updates), ",".join(inserts))

        
          session.sql(exec_sql).collect()

    return ""


session.close()

これを実行することで、create_datamart_from_sqlというprocedureが作られ、Snowflakeまたはワークフローエンジンから定期実行させることができます。
そうすることで、毎日決まった時間にデータを集計することなどが可能です。

定義ファイルのmodeというフィールドで、overwriteとappendを受け入れるようにしてあり、overwriteの場合はテーブルの洗い替えを行い、appendの場合は merge into でデータのupdate or insertを行なっています。

上記2つの処理を、mainブランチにマージされた際にgithub actionsで自動で動くようにしています。
こうすることで、勝手にテーブルが作られることなく、エンジニアサイドでレビューをすることも可能になりました。

データチームにgithubでプルリクを作ってもらう必要がある

そしてこの定義ファイルを更新しSQLファイルを作る部分ですが、理想はブランチを切ってもらいプルリクエストを出してもらうことでした。しかし、利用者は主に非エンジニアなので、こちらは難しいと考えました。
なので以下のような申請フローを作りました。(弊社SREのvaru3が作ったものをほぼそのままパクって使っています)

slack経由でlambdaを実行してgithub actionを実行

slackの申請フロー経由でテーブル定義とSQLを書いてもらうことで、テーブル作成用の定義ファイルが更新・テーブルにデータを追加するSQLファイルが作成されたプルリクエストが発行され、そのプルリクエストが受理してmainブランチにマージするとテーブルが作成・データ投入用のprocedureも更新される、というわけです。

これで中間テーブルの作成がセルフサービス化され、中間テーブルを迅速に作成できるようになり、また中間テーブルの作成にデータ基盤チームのリソースを使わなくて済むようになりました。


データカタログを整える

中間テーブルの作成をセルフサービス化できたことで、中間テーブルを迅速に作れるようになりました。
しかし、「直感的に、すぐに欲しい情報にアクセスできること」という状態のためには、まだ不十分です。
テーブルがあるだけで探せなければ意味がありません。
家電量販店なんかでも、どこに何があるか、わかりやすい地図がありますよね。
なので、データカタログの作成を検討しました。

以前からドキュメントやスプレッドシートなどでテーブルとカラムそれぞれ何を意図しているものかを利用者に提供しようとしていましたが、どうしてもメンテナンスが疎かになったり、この更新が足を引っ張りテーブルの作成が遅れるなどが発生しました。
まぁ普通にめんどくさいですよね。

なので、テーブルやカラムの状態や型、説明をなんとか自動で取得し、データカタログを作れないかと以前から検討していました。

そしてこの中間テーブルのセルフサービス化を行うことで、一気に解決することができました。

具体的には先ほどのテーブル定義のファイルに、コメントを追加することで解決しました。

[
  { 
        "name": "user_pv",
    "schema": "schema",
    "mode": "overwrite",
        "desc": "ユーザー毎のPVを集計したテーブル",
    "columns": [
       {
         "name": "user_id",
         "type": "integer",
         "desc": "ユーザーID" 
       },
       {
         "name": "pv",
         "type": "integer",
         "desc": "pv数" 
       }
    ]
  },
  { 
        "name": "daily_pv",
    "schema": "schema",
    "mode": "append",
        "desc": "日別でPVを集計したテーブル",
    "columns": [
       {
         "name": "date",
         "type": "date"
       },
       {
         "name": "pv",
         "type": "integer",
         "desc": "pv数"
       },
       {
         "name": "uu_pv",
         "type": "integer",
         "desc": "PVのユーザー毎のユニーク数"
       }
    ]
  }
]

定義ファイルは必ずテーブル追加の際に更新されるので、このファイルをデータカタログに使えば、自動で現在の状態を追従し、しかもコメントで説明まで可能となりました。

現在は、Reactでアプリケーションを作り、このファイルを参照してデータカタログの描画を行なっています。


監視体制も整える

もう一点、非常に重要な課題として、データの信頼性ということがありました。
データマートの作成は複雑になってしまう部分が多く、データの集計時のミスやデータの前処理に失敗して期待された集計ができないなどが発生し、信頼性が落ちてしまっていました。

このままでは、「直感的に扱えるけど信じて良いのかわからない」という状態になってしまいます。

しかし、データの監視は非常に難しい部分だと感じています。
それぞれカラムに期待する結果は違いますし、監視項目も非常に多いです。
また、他のシステム同様、テストをしっかりメンテナンスしていく難しさもあります。
どうしても速度をあげると疎かになってしまう部分でありますが、なんとか業務スピードを担保したまま信頼性を向上できないか、と考えました。

こちらも定義ファイルにいくつか追加し、Snowparkでスクリプトを実行することで可能にしました。

[
  { 
        "name": "user_pv",
    "schema": "schema",
    "mode": "overwrite",
        "desc": "ユーザー毎のPVを集計したテーブル",
       "key": "user_id",
    "columns": [
       {
         "name": "user_id",
         "type": "integer",
         "desc": "ユーザーID" 
       },
       {
         "name": "pv",
         "type": "integer",
         "desc": "pv数" ,
         "rule": "is not null"
       }
    ]
  },
  { 
        "name": "daily_pv",
    "schema": "schema",
    "mode": "append",
        "desc": "日別でPVを集計したテーブル",
       "key": "date",
    "columns": [
       {
         "name": "date",
         "type": "date"
       },
       {
         "name": "pv",
         "type": "integer",
         "desc": "pv数",
         "rule": "is not null"
       },
       {
         "name": "uu_pv",
         "type": "integer",
         "desc": "PVのユーザー毎のユニーク数",
         "rule": "is not null"
       }
    ]
  }
]

monitor.py

import datetime
import json
import os
import os.path
import sys

import snowflake
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc

connection_parameters = {
    "account": "[YOUR ACCOUNT]",
    "region": "[YOUR REGION]",
    "user": "[USER]",
    "password": "[PASSWORD]",
    "role": "[ROLE]",
    "warehouse": "[WAREHOUSE]",
    "database": "[DATABASE]",
    "schema": "[SCHEMA]"
}
session = Session.builder.configs(connection_parameters).create()

session.add_import("definitions.json")


@sproc(name="monitor_datamart", is_permanent=True, stage_location="[STAGE]", replace=True, packages=["snowflake-snowpark-python"])
def monitor_datamart(session: snowflake.snowpark.Session) -> str:

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    r = ''
    with open(import_dir + 'definitions.json', 'r', encoding='utf-8') as file:
        r = file.read()

    targets = json.loads(r)
    insert_values = []
    for target in targets:
        table_r = session.sql("""
          select count(1)
          from {0}.{1}
          group by {2}
          having count(1) > 1;
        """.format(target["schema"], target["name"], target["key"])).collect()

        if len(table_r) == 0:
            insert_values.append(
                [target["schema"], target["name"], target["key"], "duplicate"])

        for c in target["columns"]:
            if "rule" not in c:
                continue

            rule = c["rule"]
            cond = ""
            if rule == "is not null":
                cond = "is null"

            if cond == "":
                continue

            sql = """
                    select count(1) as cnt
                    from {0}.{1}
                    where {2} {3};
                """.format(target["schema"], target["name"], c["name"], cond)

            column_r = session.sql(sql).collect()
            if column_r[0]["CNT"] == 0:
                insert_values.append(
                    [target["schema"], target["name"], c["name"], cond])

    // alert

    return ""


session.close()

definitions.jsonにkeyを設定し、ユニークであることをチェックしています。
また、columnごとにruleを設定することで、そのルールに沿った状態でデータが存在しているかのチェックもしています。
この結果を何らかの方法でalertを発するようにすれば、簡易的ですがデータの監視ができます。
加えて、ruleのパターンを増やすことで色々なパターンにも対応できるようになります。

まとめ

このようなアーキテクチャーを作成することで、中間データの作成と同時にデータカタログの整備、データの監視の自動化も実現しました。
テーブル定義を一元化したことによって実現できているわけですが、snowparkのおかげでデータを使った柔軟な処理が可能になり、上記のようなことが実現できています。

すごく長くなってしまいましたが、データマートの運用・データカタログの運用などに悩まれている方の力になれれば幸いです。

この記事が気に入ったらサポートをしてみませんか?