見出し画像

Embulk と Argo でデータ転送する

こんにちは、株式会社アトラエで wevox のエンジニアをしている小倉といいます。この記事では、Embulk と、Argo を含む Kubernetes の周辺ツールを使った弊社のデータ転送環境を紹介していきます。

RDS から BigQuery へのデータ転送

画像4

wevox ではデータベースとして AWS Aurora(MySQL)を利用しています。データ分析環境としては GCP を利用しており、Aurora から BigQuery にいくつかのテーブルを転送するために Embulk の利用をはじめました。

Embulk 自体は ETL ツールとしては王道であり検索すれば情報もたくさん出てくるので多くは記述しません。インプットプラグインには embulk-input-mysql を、アウトプットプラグインには embulk-output-bigquery を使っています。

Embulk と一緒に使われるワークフローツールとして Digdag がよく挙げられます。wevox チームではもともと Argo CD を含めた Kubernetes エコシステムツール を利用していたこと、Argo Workflow をもともと利用していたこと、Embulk での転送完了をトリガーに別のジョブを実行させたいこと、その他の理由が重なり Argo Workflow で Embulk を実行することにしました。

Embulk をシェルスクリプトで処理していた暗黒時代

現在約30個のテーブルの転送を行っていますが、元々は Embulk の実行にコンテナとシェルスクリプトを使っていました。例えば以下のような Embulk の定義ファイルがあるとします。

in:
 type: mysql
 host: {{ env.AURORA_ENDPOINT }}
 user: wevox
 password: {{ env.AURORA_PASSWORD }}
 select: "*"
 port: {{ env.AURORA_PORT }}
 options: {useLegacyDatetimeCode: false, serverTimezone: Asia/Tokyo}
 database: HOGE
 table: FUGA

out:
 type: bigquery
 auth_method: json_key
 json_keyfile: /app/credential_gcp_prod.json
 path_prefix: /tmp/ # 一時ファイル作成場所
 file_ext: .csv.gz
 source_format: CSV
 project: {{ env.GCP_PROJECT }}
 auto_create_table: true
 location: asia-northeast1
 formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
 encoders: {type: gzip}
 dataset: PIYO
 table: FUGA
 mode: replace

テーブルの数だけ上記の yml.liquid ファイルを作成し、コンテナが立ち上がった時にシェルスクリプトで順番に実行する、失敗したら slack に通知する、という実装をしていました。

しかし上記の実行をすると3つの問題が浮上します。

• 可読性が失われる。シェルがすごいことになる()。
• 複雑なエラーハンドリングをシェルでやるのが辛い。
• 1コンテナの中で Embulk をシーケンシャルに実行するので、適切なリソース配分が難しい。1コンテナ1テーブルの処理をしたい。

特に3つ目については、1つのコンテナで実行するという構造上、サイズの小さなテーブルにも大きなテーブルにも同じリソース(CPU/メモリ)を配分することになるので省エネが叫ばれる昨今には優しくない設計です(?)。Embulk は CPU の数に応じて並列数が変わるため、これは生産性の視点では死活問題です。

まさに「Digdag使えよ!」という状況ですが、ワークフローエンジンを選択する上で多くの選択肢から改めて検討した結果、選ばれたのは Argo Workflow でした。

Argo Workflow を使った実装

画像3

Argo Workflow は Kubernetes をベースにしたワークフローエンジンです。まず課題だったシェルの置き換えですが、 Argo Workflow の WorkflowTemplate を使うことで解消しました。WorkflowTemplate は v2.4以降から利用できるようになった比較的新しい機能です。

Kubernetes あるあるですが、1つの YAML ファイルがかなり長くなってしまったり同じ表現が何度も出てきたりするといったことがあります。それらを回避するために、オブジェクト指向でいうクラスを作成します。これがテンプレートと呼ばれるものです。そして他のファイルで、クラスからインスタンスを生成するようにテンプレートを呼び出します。呼び出す時は templateRef を用います。

まず Embulk の基になるテンプレートを作成します。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
 name: embulk-template
spec:
 serviceAccountName: argo-wf-sa
 templates:
 - name: embulk-large-template
   retryStrategy:
     limit: 2
     retryPolicy: "Always"
     backoff:
       duration: "5s"
       factor: 2
   inputs:
     parameters:
     - name: AURORA_DB_NAME
     - name: GCP_DATASET_NAME
     - name: TABLE_NAME
   container:
     image: [IMAGE YOU CREATED]
     command: [embulk]
     args: ["run", "{{inputs.parameters.TABLE_NAME}}.yml.liquid"]
     env:
     - name: AURORA_DB_NAME
       value: "{{inputs.parameters.AURORA_DB_NAME}}"
     - name: GCP_DATASET_NAME
       value: "{{inputs.parameters.GCP_DATASET_NAME}}"
     - name: TABLE_NAME
       value: "{{inputs.parameters.TABLE_NAME}}"
     envFrom:
     - secretRef:
         name: string-secret
     - configMapRef:
         name: config
     volumeMounts:
     - name: credentials-volume
       mountPath: "/app/credentials/" 
       readOnly: true
     resources:
       requests:
         memory: "1024Mi"
         cpu: "4"
       limits:
         memory: "1024Mi"
         cpu: "4"
 volumes:
 - name: credentials-volume
   secret:
     secretName: volume-secret

いくつかピックアップします。まず retryStrategy では Pod (コンテナ)が失敗した時のリトライ設定を定義しています。limit はリトライ数です。 retryPolicy: "Always" とするとエラーとフェイルの両方でリトライしてくれます。backoff は exponential backoff の設定です。

   retryStrategy:
     limit: 2
     retryPolicy: "Always"
     backoff:
       duration: "5s"
       factor: 2

1テーブル1コンテナとして実行する時に、Docker イメージをテーブルの数だけ作るのは非現実的です。そのためデータセット名やテーブル名を変数化して渡せるようなイメージを1つ作って、それをそれぞれの実行時に呼び出します

inputs.parameters ではその変数を定義しておき、実行時に変数を個別に渡していきます。

   inputs:
     parameters:
     - name: AURORA_DB_NAME
     - name: GCP_DATASET_NAME
     - name: TABLE_NAME
   container:
     image: [IMAGE YOU CREATED]
     command: [embulk]
     args: ["run", "{{inputs.parameters.TABLE_NAME}}.yml.liquid"]
     env:
     - name: AURORA_DB_NAME
       value: "{{inputs.parameters.AURORA_DB_NAME}}"
     - name: GCP_DATASET_NAME
       value: "{{inputs.parameters.GCP_DATASET_NAME}}"
     - name: TABLE_NAME
       value: "{{inputs.parameters.TABLE_NAME}}"

DB のパスワードは Kubernetes の Secret が保持しています。加えて Embulk が BigQuery にアクセスするための Credentials JSON キーをボリュームマウントして渡しています。

     envFrom:
     - secretRef:
         name: string-secret
     - configMapRef:
         name: config
     volumeMounts:
     - name: credentials-volume
       mountPath: "/app/credentials/" 
       readOnly: true
 volumes:
 - name: credentials-volume
   secret:
     secretName: volume-secret

しかし、そのまま GitHub にコミットすると平文のパスワードが保存されてしまうため、ここでは Sealed Secret を使って暗号化しています。

$ kubeseal < secret_string.yaml -o yaml > sealed_secret_string.yaml

$ kubeseal < secret_volume.yaml -o yaml > sealed_secret_volume.yaml

平文が記載されている Secret はあくまで自分のローカルにのみ保持しておき gitignore します。 Sealed Secret を使って暗号化された  Sealed を apply し、envFrom でまとめて呼び出しています。

これまで1コンテナで数テーブルをシーケンシャルに実行してましたが、WorkflowTemplate によって1コンテナ1テーブルの実行が簡単に実行できるようになり、その結果として各テーブルに対してリソースを指定できるため、テーブルごとにリソースの配分が出来るようになりました。 

上記の例では 4CPU と 1GB を request 値として受け取っていますが、小さいサイズのテーブルのテンプレートを作って対応することも可能です。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
 name: embulk-samll-template
spec:
 serviceAccountName: argo-wf-sa
 templates:
     resources:
       requests:
         memory: "512Mi"
         cpu: "1"
       limits:
         memory: "512Mi"
         cpu: "1"


次に Embulk を並列で実行するための WorkflowTemplate を作成します。つまり二重構造のテンプレートを作成しています。

スクリーンショット 2020-07-10 16.15.15

Embulk の実行だけならこんなことをしなくてもいいですが、我々の場合はデータ転送完了をトリガーに次のジョブを実行する必要があるためこの形式をとっています。

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
 name: embulk-wf-template
spec:
 serviceAccountName: argo-wf-sa
 entrypoint: embulk
 templates:
 - name: embulk
   dag:
     tasks:
     # checks tables below
     - name: table-a
       templateRef:
         name: embulk-large-template
         template: embulk-large-template 
       arguments:
         parameters:
         - name: AURORA_DB_NAME
           value: AURORA_DB_NAME
         - name: GCP_DATASET_NAME
           value: GCP_DATASET_NAME
         - name: TABLE_NAME
           value: TABLE_NAME
     - name: table-b
       dependencies: [table-a]
       templateRef:
         name: embulk-samll-template
         template: embulk-samll-template
       arguments:
         parameters:
         - name: TABLE_NAME
           value: table-a
         - name: TABLE_NAME
           value: table-a
         - name: TABLE_NAME
           value: table-a
.
.
.

事前に定義したテンプレートを呼び出す時は templateRef を使います。 parameters で変数を受け渡しています。

今回は単純なワークフローではなく DAG を採用しました。 dependencies を使うことで依存関係を定義できます。

テンプレートを一気に呼び出す親ファイルを作成します。こんな実行になるイメージです。

スクリーンショット 2020-07-10 15.11.44


CronWorkflow とう Argo Workflow の機能を利用しています。v2.5以降の機能ですが、これによってワークフローを Kubernetes の CronJob のように利用できます。

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
 name: e2e-wf
spec:
 schedule: "0 0 * * *" #日本時間0時に実行
 timezone: Asia/Tokyo
 suspend: False
 concurrencyPolicy: Forbid
 startingDeadlineSeconds: 20
 successfulJobsHistoryLimit: 3
 failedJobsHistoryLimit: 3
 workflowSpec:
   serviceAccountName: argo-wf-sa
   entrypoint: end2end
   onExit: exit-handler
   templates:
   - name: end2end
     dag:
       tasks:
       - name: embulk
         templateRef:
           name: embulk-wf-template
           template: embulk
       - name: job-2
         templateRef:
           name: wf-job-2
           template: wf-job-2
         dependencies: [embulk]
       - name: job-3
         templateRef:
           name: wf-job-3
           template: wf-job-3
         dependencies: [job-2]
  
   - name: exit-handler
     steps:
     - - name: notify-failure
         template: notify-failure
         when: "{{workflow.status}} != Succeeded"

   - name: notify-failure
     container:
       image: [IMAGE TO NOTIFY YOU CREATED]
       command: [sh, -c]
       args: ["python notification.py {{workflow.name}} {{workflow.status}}"]

Argo Workflow には Exit handler という機能が備わっており、ワークフローが成功した時と失敗した時の条件分岐実行が簡単に実装できます。

 workflowSpec:
   onExit: exit-handler
 
 .
 .
 .
 
   - name: exit-handler
     steps:
     - - name: notify-failure
         template: notify-failure
         when: "{{workflow.status}} != Succeeded"

   - name: notify-failure
     container:
       image: [IMAGE TO NOTIFY YOU CREATED]
       command: [sh, -c]
       args: ["python notification.py {{workflow.name}} {{workflow.status}}"]

上記では embulk の実行が失敗した時は以下の挙動を取ります。

1. retryStrategy に従ってコンテナのリトライを実行。
2. retryStrategy.limit の数だけリトライをして、それでも失敗するようなら notify-failure を実行。

notify-failure では slack に失敗した旨の通知が届くような Python コードがコンテナで実行されます。シェルでやってたときよりもシンプルにエラーハンドリングができるようになりました。

Argo UI で可視化する

Argo Workflow には備え付けの UI ツールがあります。以下のコマンドで UI ツールを立ち上げられます。

# v2.5 以降
# locahost:2746 へ
$ argo server

# v2.5未満
# locahost:8001へ
$ kubectl port-forward deployment/argo-ui 8001:8001

ブラウザを立ち上げてみます。上の塊が Embulk の各テーブルの転送です。1つの丸が1コンテナ1テーブルになっています。

スクリーンショット 2020-07-10 1.14.33

それぞれのコンテナ実行にどれくらいの時間が掛かったかもわかります(コンテナ名は消していますが)。これを見ながら CPU の数を調整すると良さそうです。

スクリーンショット 2020-07-10 9.25.08

Kustomize で環境構成管理

本番、ステージ、開発と環境があった時にそれぞれで YAML ファイルを愚直に管理するのは大変だと思います。全く同じコードをそのまま使い回すならいいですが、本番と開発では往々にして用いる DB が異なります。また本番は潤沢なリソースを使い、開発は最低限のリソースでいい、などの事情もあるはずです。処理自体は変わらないけど、環境による微妙な差分を管理する時に Kustomize は便利です。

詳しくは公式ドキュメントを参照してほしいですが、wevox では本番反映するコードを bases ディレクトリにて管理し、stg と dev にて発生する差分を overlays/stg ディレクトリと overlays/dev ディレクトリにて管理しています。

bases/kustomization.yaml を以下のように定義します。

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
commonLabels:
 env: prd
namespace: analysis-prd
resources:
- config/sealed_secret_volume.yaml
- config/sealed_secret_string.yaml
- config/configmap.yaml
- role-binding.yaml
- embulk-template.yaml
- embulk-wf-template.yaml
- job-1-template.yaml
- job-2-template.yaml
- wf-execute.yaml

本番に反映する時は以下のコマンドを叩けばOKです。

$ kusotmize build bases/ | kubectl apply -f -

dev 環境で DB の設定値を変更している時は overlays/dev を以下のように kusotmize build overlays/dev/ を編集します。見ての通り、差分があるファイルだけ patchesStrategicMerge で指定してます。そのため重複するファイルに関してはそもそも overlays/ 以下に管理しなくていいのです。

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
commonLabels:
 env: dev
namespace: analysis-dev
bases:
- ../../bases
patchesStrategicMerge:
- config/sealed_secret_volume.yaml
- config/sealed_secret_string.yaml
- config/configmap.yaml

dev 環境で試したい時は以下のコマンドを叩きます。

$ kusotmize build overlays/dev/ | kubectl apply -f -


Argo CD を用いた GitOps

画像5

長くなってきましたが最後です。卒論くらいの量になってきました。

wevox では CD ツールとして Argo CD を使っています。Argo CD はコンテナとして Kubernetes 上に起動し続け、常にクラスタの状況と指定した GitHub レポジトリの特定のブランチとを同期しようとします。つまり、常に GitHub のコードの状態をクラスタに反映し続けます。

例えば prd 環境は master ブランチと同期するようにし、 dev 環境は develop ブランチと同期するように設定してやれば、誤ったマージが無い限り適切な CD が実現されます。

Kustomize と組み合わせることでより環境ごとの管理が楽になります。

apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
 name: analysis
 namespace: argocd
spec:
 syncPolicy:
   automated: {}
 project: analysis
 source:
   repoURL: [YOUR REPOGITRY]
   targetRevision: master
   path: analysis/bases/
 destination:
   server: https://kubernetes.default.svc
   namespace: analysis-prd

また Argo CD も Argo Workflow と同様に UI ツールを備えています。デプロイの状況などを UI 上から確認することができますし、クラスタ毎に別のドメインで管理することが可能です。

課題と今後の展望

現時点では暫くこの構成で走らせ続けたいと思っています。ただし設計・設定していて検討すべきところも幾つかあったので今後の展望を含めて書いていきます。

差分転送に対応できていない

wevox で扱うデータはBtoCサービスのような規模のデータではないため、各テーブルがめちゃくちゃ大きいわけでは有りません。そのため今は必要なテーブルをまるっと転送しています。今後テーブルサイズが大きくなってきたら差分のみ抽出して転送することが必要そうです。

サービスの性質上、過去のデータに UPDATE 文がかかることが頻繁にあるため、単純に前日からの増分新規レコードだけをとってくるのではなく、変更レコードをとってきて転送することになりそうです。

YAML が複雑化している

元々はシェルスクリプトが複雑化していたという課題を Kubernetes で解消しに行ったのでこれは当然の帰結かもしれません。幸い弊社には Kubernetes に造詣のあるメンバーが複数人いるので大丈夫ですが、ここまで解説してきたように Argo をはじめ WorkflowTemplate や Argo CD など、 Kubernetes に留まらない技術セットや比較的新しい知識が必要になってきます。

(それを少しでも解消するために、ドキュメント強化版としてこのブログを書いているということでもあるのですが・・・)

Python や JavaScript で Kubernetes のリソース定義をする技術として cdk8sjk があります。が、結局は現在のチームと今後のチームの状況を鑑みて導入するのが良さそうというのが今の私の結論なので、必要が出てきた時に検討するとします。

同期の頻度が1日1回

Embulk のバッチを深夜に1回行っているため、RDS と BigQuery の同期は1日に1回です。現在はまだ困っていませんが、データサイエンティスト含め他メンバーから「リアルタイムで同期して欲しい!」と言われる未来が来そうです。

シンプルにストリームでやる方法もありますが、Dataflow CDC (Change-data Capture)という変化したデータを検知して転送するしくみがあるのでこれも検討の1つです。


最後に

長くなりましたが以上です。株式会社アトラエ、また wevox では一緒に働く仲間を募集しています。この記事をきっかけに弊社のことが気になったらこちらからご応募ください!



この記事が気に入ったらサポートをしてみませんか?