DBマイグレーションツールAlembicとBigQueryを連携して、マイグレーションを管理する

AlembicとBigQueryを連携できると聞いたのですが、ドキュメントやチュートリアルを見つけられなかったので、試してみた備忘録です

ChatGPTさんと一緒に作業したらあっちこっちにいろいろなことが発生して思ったよりも時間がかかったので、参考になれば幸いです
(想定読者: マイグレーションツールの経験の浅い初心者)


TL;DR

  • AlembicはPythonで使えるマイグレーションツールだよ

  • AlembicはMySQLやpostgressqlなどの伝統的なDBだけでなく、BigQueryもサポートしているよ

  • BigQueryと接続するときは事前に環境設定が必要だよ

  • Alembic initで必要なファイルを書き出してくれるけど、結構な量の編集をしないといけないよ

  • ChatGPTさんはそれっぽいsuggetionをしてくれたけど、今回についてはいろいろ間違ってくれたよ…(自社サービスじゃないのにある程度コメントが正解していることにはある種の感動をしています。そんなにネガティブな感情は持っていないです)

Alembicでできること

ざっくりいうと、データベースのスキーマ変更の履歴をいい感じに管理してくれるのがマイグレーションツールです
Alembicは特にSQLAlchemyと一緒に使うことが想定されたPythonベースのマイグレーションツールになっています

マイグレーションツールを使ってスキーマ変更の履歴を管理しておくと、次ができます
・スキーマ変更(DBのカラムの変更)の順番に対する履歴の保存
・それらの順番に沿った変更の適用や変更のロールバック

特記事項としてMySQLやpostgressql, oracleなどのRDBだけでなく、BigQueryやGoogle Sprehadsheetsなどもサポートしています
(サポートしているDBのリスト)

作業の流れ

ざっくり、①初期化して必要なファイルを生成し、②BigQuery側で少し作業を行い、③credential情報などを定義し、④実際にテーブル定義&マイグレーションの実効…という感じです

  1. `alembic init alembic`で初期化

  2. BigQueryで環境整備

  3. Credential情報などをPythonスクリプトに反映

  4. テーブル定義・マイグレーション

インストール

ざっくり、次をpyproject.tomlに書いて作った環境を使っています

python = "^3.11"
Flask = "^3.0.1"
google-cloud-bigquery = "^3.16.0"
google = "^3.0.0"
flask-sqlalchemy = "^3.1.1"
flask-migrate = "^4.0.7"
SQLAlchemy = "^2.0.30"
sqlalchemy-bigquery = "^1.11.0"
alembic = "^1.13.1"
psycopg2-binary = "^2.9.9"
google-cloud-bigquery-storage = "^2.25.0"

最終的に次のようなディレクトリ構造になります

(作業ディレクトリ)
├── pyproject.toml
├── alembic.ini
└── (alembic init 出力ディレクトリ)
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
└── db
    └── models
        └── model.py

alembic init

まず、次のコマンドをシェル上で実行して、必要なファイルを生成します
(以下、シェルでの作業は$をつけています):

$ alembic init (出力ディレクトリ名)

出力ディレクトリをalembicにすると、次のような階層でファイル・ディレクトリが出力されます

(作業ディレクトリ)
├── pyproject.toml
├── alembic.ini
└── (出力ディレクトリ)
    ├── README
    ├── env.py
    ├── script.py.mako
    └── versions

alembic.ini

TOMLフォーマットのalembicの初期設定ファイルです
loggerの設定とかもしてくれています

env.py

後でマイグレーションを行っていく際の環境設定などを行います

script.py.mako

後ほどマイグレーションの内容を定義するファイルを生成するのですが、それらファイルのテンプレートがこちらに保存されています

version

マイグレーションの内容を定義するファイル群はこちらのディレクトリに保存されます

BigQuery側の準備

ざっくり必要なのは次の2つです

  • サービスアカウントの準備

  • alembicで管理するデータセットの定義

サービスアカウントの準備

後ほどalembicがテーブルを自動で生成していくため、テーブルの自動生成ができる権限を持つサービスアカウントを付与する必要があります
わかりやすい記事が他にたくさんある(し、公式ドキュメントもわかりやすい)ので、今回は省略します

今回はサービスアカウントに紐づくjsonファイルのCredentialを後で使います
忘れずに発行・保存しておいてください

データセットの定義

alembicではmigrationファイル(upgrade/downgradeを定義したファイル)それぞれにversion IDを割り振っていて、最初のテーブル定義のタイミングでversion IDを管理するテーブルを自動で生成します

BigQueryは(Project).(dataset).(table)の形式でテーブルを定義します。
このとき、tableを定義するためにはdatasetが生成されている必要がありますそのため、この時点でデータセットを定義しておく必要があります

筆者はalembicという名前のデータセットをプロジェクトの下に定義しました
BigQueryはこんな感じになっています

プロジェクト
├── alembic
├── (他のデータセット)

コーディング

alembic initで生成されたファイル・ディレクトリの他にdbというディレクトリをつけました
今はこんな感じにしていきます

(作業ディレクトリ)
├── pyproject.toml
├── alembic.ini
└── (alembic init 出力ディレクトリ)
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
└── db
    └── models
        └── model.py

alembic.ini

alembic.iniをエディタで開いて次の情報をつけ足します
[alembic]の項目は自動生成されているはずなので、すでに定義されているものの下につけ足してください

[alembic]
script_location = (alembic initで指定したディレクトリ)
sqlalchemy.url = bigquery://(bigqueryのプロジェクト名)

[bigquery]
project_id = (bigqueryのプロジェクト名)
credentials = (サービスアカウントのcredential.json)

model.py

先にDBのモデルを定義してしまいましょう
こんな感じに定義しました

補足: crendential_pathをここで呼ぶのはイケてないので後日修正します

from datetime import datetime

from sqlalchemy import create_engine, Column, String, Integer, Unicode, DateTime
from sqlalchemy.ext.declarative import declarative_base

# Engine の作成
Engine = create_engine(
    'bigquery://(project名)/(データセット名)',
    credentials_path='/path/to/Credential.json'
)
 
ModelBase = declarative_base()

env.py

個人的に一番てこづったのがここでした
結果的には次でうまくいきました

from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy_bigquery import BigQueryDialect

from google.cloud import bigquery
from google.oauth2 import service_account

from alembic import context

from db.models.model import ModelBase, Engine

#環境変数の定義: BigQueryとの接続の際、どうも環境変数でも定義しておかないとうまく接続されない
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/path/to/Credential.json"

#from migr.db import base # モデルクラスの読み込み
#target_metadata = base.Base.metadata 

# iniファイルで定義したini情報の読み込み
config = context.config

# models.pyで定義したDBのmetadataを宣言
target_metadata = ModelBase.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""

    #models.pyで定義したEngineをつなぐ
    connectable = Engine

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

変更をコミットする

ここまで書けたら次を実行します
commentsは"initial migration"とでもなんでもしてください

$ alembic revision --autogenerate -m (comments)

そうすると、次の変化が起こります:

  • versionsの下に(revisionId)_(comments).pyというファイルが生成される

  • BigQueryの接続したテーブルの下に`alembic_version`というテーブルが生成され、version_numとしてrevisionIdが記録される

BigQueryのコンソール

生成されたca938f69efdd_initial_migration.pyはこんな感じの内容です

"""Initial migration

Revision ID: ca938f69efdd
Revises: 
Create Date: 2024-05-12 16:45:06.310399

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'ca938f69efdd'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    pass
    # ### end Alembic commands ###


def downgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    pass
    # ### end Alembic commands ###

revisionを適用するのには次のようなコマンドを使います

一番最初のコミットから今に至るすべてのマイグレーションの適用時:

$ alembic upgrade head

最後のマイグレーションの適用だけをしたいとき:

$ alembic upgrade +1

テーブルの定義

では、実際にマイグレーションの管理機能を使ってみましょう
models.pyを次のように変更します
AcountModelというテーブルを定義しました

from datetime import datetime

from sqlalchemy import create_engine, Column, String, Integer, Unicode, DateTime
from sqlalchemy.ext.declarative import declarative_base

# Engine の作成
Engine = create_engine(
    'bigquery://t-gateway-373112/alembic_version',
    credentials_path='/mnt/c/python/SQLAlchemy/t-gateway-373112-f42035e8c112(power).json'
)
 
ModelBase = declarative_base()


class AcountModel(ModelBase):
    """
    AcountModel
    """
    __tablename__ = 'account'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), nullable=False)
    description = Column(Unicode(200))
    created_at = Column(DateTime, default=datetime.now, nullable=False)
    updated_at = Column(DateTime, default=datetime.now, nullable=False)

Alembicはこのmodels.pyで定義されたテーブルのメタデータの変更を自動で検知して、マイグレーション定義ファイルを新たに生成します

そしてまた、revisionします

$ alembic revision --autogenerate -m (comments)

そうすると、次のrevisionが生成されます

"""Table define

Revision ID: c344aeaf06fd
Revises: ca938f69efdd
Create Date: 2024-05-12 16:46:23.452817

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'c344aeaf06fd'
down_revision: Union[str, None] = 'ca938f69efdd'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('account',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(length=50), nullable=False),
    sa.Column('description', sa.Unicode(length=200), nullable=True),
    sa.Column('created_at', sa.DateTime(), nullable=False),
    sa.Column('updated_at', sa.DateTime(), nullable=False),
    sa.PrimaryKeyConstraint('id')
    )
    # ### end Alembic commands ###


def downgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table('account')
    # ### end Alembic commands ###

こうなればあとはalembic upgradeを適用してください

$ alembic upgrade +1

次のような空テーブルが生成されていれば成功です

補足: どこで躓いたか

大きく次の二つだと思います

  • alembicが正常動作するときにどうなるかが良くわかっていなかった

    • とはいうものの、alembic関連のブログを読めばここはわかるのでさして困っていない

  • BigQueryとの接続周辺部

    • 困ったのは完全にここ。できてしまえばそんなものかくらいなのですが、alembicが正常動作すると何をし始めるのかが良くわかっていなかったので、データセットを先に作っておいてあげるなどができていませんでした



この記事が気に入ったらサポートをしてみませんか?