ローカルKubeflowのPipelineでGCPを操作する

MiniKFでローカルにたてたKubeflowのPipelineでGCPを操作してみた
とりあえず公式ドキュメントを参考に作成してみた感じ
今回はETL的なイメージで「csvファイルをGCSにアップロード」、「GCSからBQにデータをロード」と言う2stepをパイプラインで実現

スクリーンショット 2020-04-23 22.03.41

csvをGCSにアップロードComponent作成

基本ディレクトリ構成はベスプラ参考にしてこんな感じ
(100MB以上あるcsvとかはgitで管理できないんのでgit-lfsあたりを導入

components
└── forecasting
   └── upload_from_local_to_gcs
       ├── Dockerfile
       ├── README.md
       ├── build_image.sh
       ├── requirements.txt
       └── src
           ├── data
           │   ├── calendar.csv
           │   ├── sales_train_validation.csv
           │   ├── sample_submission.csv
           │   └── sell_prices.csv
           └── main.py

GCSにアップロードするmain.pyを作る

from google.cloud import storage
import argparse
import os
from os import listdir


def main():
   parser = argparse.ArgumentParser(description='ML Trainer')
   parser.add_argument('--bucket-name', type=str, help='Google Cloud Storage bucket name', required=True)
   parser.add_argument('--bucket-dir', type=str, help='Google Cloud Storage bucket dir', required=True)
   args = parser.parse_args()

   storage_client = storage.Client()
   bucket = storage_client.bucket(args.bucket_name)
   destination_blob_dir = args.bucket_dir
   source_dir = os.path.join(os.path.dirname(__file__), 'data')
   for file in listdir(source_dir):
       destination_blob_name = f"{destination_blob_dir}/{file}"
       blob = bucket.blob(destination_blob_name)
       blob.upload_from_filename(os.path.join(source_dir, file))

   with open('/tmp/output_bucket_name.txt', 'w') as f:
       f.write(args.bucket_name)

   with open('/tmp/output_bucket_dir.txt', 'w') as f:
       f.write(destination_blob_dir)


if __name__ == '__main__':
   main()

ポイントとしては以下
1. アップロードするバケットとディレクトリをパイプラインの引数で指定
2. 次のComponentのInputとして渡したいものをOutputとして出力

requirements.txtに必要なパッケージ記載

google-cloud-storage==1.27.0

pythonをローカルで実行して動作すること確認したら、コンテナ化のためのDockerfile作成

FROM python:3.8.2
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY ./src /pipelines/component/src
ENTRYPOINT ["python", "/pipelines/component/src/main.py"]

ビルドしてGoogle Container Registryにpushするためのbuild_image.shも作っとく。[project_name]は自分のGCP Project nameを指定してください。

#!/bin/sh

if [ $# -lt 1 ]; then
 cat << EOS
 image_tag argument is required.
 ex) sh build_image.sh 0.1.0
EOS
 exit
fi

IMAGE_TAG="$1"

image_name=asia.gcr.io/[project_name]/forecasting/upload_from_local_to_gcs
full_image_name=${image_name}:${IMAGE_TAG}

cd "$(dirname "$0")"

docker build -t "$full_image_name" .
docker push "$full_image_name"

実際にビルドしてプッシュしてみる

$ sh build_image.sh 0.1.0
Sending build context to Docker daemon  328.7MB
...
0.1.1: digest: sha256:xxx size: 2847

GCSのcsvからBQテーブル生成component作成

構成は基本的に同じで、違うディレクトリに作成した

components
└── forecasting
   ├── load_from_gcs_to_bq
   │   ├── Dockerfile
   │   ├── README.md
   │   ├── build_image.sh
   │   ├── requirements.txt
   │   └── src
   │       └── main.py
   └── upload_from_local_to_gcs

GCSのCSVをBQにロードするmain.py作成

import argparse
from google.cloud import storage
from google.cloud import bigquery


def main():
   parser = argparse.ArgumentParser(description='ML Trainer')
   parser.add_argument('--bucket-name', type=str, help='Google Cloud Storage bucket name', required=True)
   parser.add_argument('--bucket-dir', type=str, help='Google Cloud Storage bucket dir', required=True)
   args = parser.parse_args()
   print('args.bucket_name: ', args.bucket_name)
   print('args.bucket_dir: ', args.bucket_dir)

   storage_client = storage.Client()
   blobs = storage_client.list_blobs(
       args.bucket_name, prefix=args.bucket_dir
   )

   client = bigquery.Client()
   dataset_id = 'raw'

   dataset_ref = client. dataset(dataset_id)
   job_config = bigquery.LoadJobConfig()
   job_config.autodetect = True
   job_config.skip_leading_rows = 1
   job_config.source_format = bigquery.SourceFormat.CSV
   base_uri = f"gs://{args.bucket_name}"
   for blob in blobs:
       num = blob.name.count('/')
       string = blob.name.split('/')[num]
       string_list = string.split('.')
       load_job = client.load_table_from_uri(
           f"{base_uri}/{blob.name}", dataset_ref.table(string_list[0]), job_config=job_config
       )
       load_job.result()  # Waits for table load to complete.

   with open('/tmp/output_dataset.txt', 'w') as f:
       f.write(dataset_id)


if __name__ == '__main__':
   main()

requirements.txtはこんな感じ

google-cloud-bigquery==1.24.0
google-cloud-storage==1.27.0

同様にしてDockerfileを作成(さっきのと全く同じ内容や)

FROM python:3.8.2
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY ./src /pipelines/component/src
ENTRYPOINT ["python", "/pipelines/component/src/main.py"]

さっきとはimage_nameを変えたbuild_image.shを作成
[project_name]は例の如く自分のやつを設定してくだされ

#!/bin/sh

if [ $# -lt 1 ]; then
 cat << EOS
 image_tag argument is required.
 ex) sh build_image.sh 0.1.0
EOS
 exit
fi

IMAGE_TAG="$1"

image_name=asia.gcr.io/[project_name]/forecasting/load_from_gcs_to_bq
full_image_name=${image_name}:${IMAGE_TAG}

cd "$(dirname "$0")"

docker build -t "$full_image_name" .
docker push "$full_image_name"

ビルド&プッシュしておく

$ sh build_image.sh 0.1.0
Sending build context to Docker daemon  328.7MB
...
0.1.1: digest: sha256:xxx size: 2847

GCP認証するための事前準備

ここが一番つまずいたポイント。基本的な流れとしては以下
1. サービスアカウント の作成
2. サービスアカウントキー(json)のダウンロード
3. GCRのprivate imageからpullするためのサービスアカウントsecret作成
4. GCS, BQに接続するためのサービスアカウントsecret作成

1. サービスアカウント の作成
以下役割を付与したサービスアカウントを作成
- BigQuery 管理者
- ストレージ管理者
- ストレージ オブジェクト閲覧者(GCR pull用)

2. サービスアカウントキー(json)のダウンロード
コンソールから作成したサービスアカウント のキーダウンロード

3. GCRのprivate imageからpullするためのサービスアカウントsecret作成
vagrantにダウンロードしたサービスアカウント キーをアップロード
そんで、Secret作成。これでPrivateなGCRからpullできる
適宜以下の通り修正して実行
・gcp_credential.jsonはダウンロードしたキーの名前
・--docker-emailはサービスアカウントのメールアドレス

$ vagrant upload gcp_credential.json /tmp/gcp_credential.json
$ vagrant ssh
vagrant@minikf:~$ kubectl create secret docker-registry regcred \
 --docker-server=asia.gcr.io \
 --docker-username=_json_key \
 --docker-password="$(cat /tmp/gcp_credential.json)" \
 --docker-email=for-local-test@[project_name].iam.gserviceaccount.com \
 -n kubeflow

4. GCS, BQに接続するためのサービスアカウントsecret作成
3. と同様にvagrant上でもうひとつsecret作成

vagrant@minikf:~$ kubectl create secret generic user-gcp-sa -n kubeflow --from-file=user-gcp-sa.json=/tmp/gcp_credential.json

Componentを繋ぐPipelineを作成

2つのComponentディレクトリと並列したとこにパイプラインを置いてみる

components
└── forecasting
   ├── load_from_gcs_to_bq
   ├── pipeline.py
   ├── pipeline.tar.gz
   └── upload_from_local_to_gcs

pipeline.pyはこんな感じにした
[project_name]は適宜変更ください

import kfp
from kfp import dsl
from kfp import gcp
import kubernetes


def load_from_gcs_to_bq(bucket_name, bucket_dir):
   return dsl.ContainerOp(
       name='load csv files from gcs to bq',
       image='asia.gcr.io/[project_name]/forecasting/load_from_gcs_to_bq:0.1.0',
       arguments=[
           '--bucket-name', bucket_name,
           '--bucket-dir', bucket_dir,
       ],
       file_outputs={
           'dataset': '/tmp/output_dataset.txt',
       }
   ).apply(gcp.use_gcp_secret())


def upload_from_local_to_gcs_op(bucket_name, bucket_dir):
   return dsl.ContainerOp(
       name='upload csv files from local to gcs',
       image='asia.gcr.io/[project_name]/forecasting/upload_from_local_to_gcs:0.1.0',
       arguments=[
           '--bucket-name', bucket_name,
           '--bucket-dir', bucket_dir,
           ],
       file_outputs={
           'bucket_name': '/tmp/output_bucket_name.txt',
           'bucket_dir': '/tmp/output_bucket_dir.txt',
       }
   ).apply(gcp.use_gcp_secret())


@dsl.pipeline(
   name='m5 forecasting pipeline',
   description='A pipeline fro m5 forecasting.'
)
def m5_forecasting_pipeline(bucket_name, bucket_dir):
   dsl.get_pipeline_conf().set_image_pull_secrets([kubernetes.client.V1LocalObjectReference(name="regcred")])
   _upload_from_local_to_gcs_op = upload_from_local_to_gcs_op(bucket_name, bucket_dir)

   load_from_gcs_to_bq(
       _upload_from_local_to_gcs_op.outputs['bucket_name'],
       _upload_from_local_to_gcs_op.outputs['bucket_dir'],
   ).after(_upload_from_local_to_gcs_op)


if __name__ == '__main__':
   kfp.compiler.Compiler().compile(m5_forecasting_pipeline, __file__ + '.yaml')

ポイントとしては以下
1. set_image_pull_secretsメソッドで先ほど作ったsecretを指定。これでGCRのprivate repositoryからpullできる
2. apply(gcp.use_gcp_secret())で先ほど作ったもう一つのsecretを指定。これでpodからGCS, BQへアクセスできる
3. def m5_forecasting_pipeline(bucket_name, bucket_dir):でUIから指定できる引数を指定
4. file_outputsでプログラム内で出力したパスを指定。_upload_from_local_to_gcs_op.outputs['bucket_name']で前のcomponentの出力を取得して次のcomponentの入力として渡す

Pipelineをコンパイル

以下コマンドを実行

$ dsl-compile --py pipeline.py --output pipeline.tar.gz

Pipelineをデプロイ

UIでUpload pipelineをクリックして生成したpipeline.tar.gzをアップロード

スクリーンショット 2020-04-23 22.47.39

Pipeline実行

Experiment作成
pipelineで「Create experiment」ボタンをクリックして作成

画像3

Pipeline実行
「Create run」ボタンをクリックして、pipeline, experimentを指定
アップロードしたいGCSのバケットとディレクトリを指定して実行!

スクリーンショット 2020-04-23 22.54.46

こんな感じでグリーンOKなアイコンがついてGCS, BQにデータ格納される

スクリーンショット 2020-04-23 22.03.41

所感

なんとかたどり着いた
GCSとの認証周りでつまづいたが、それ以外は割とスムーズに進んだ。
componentの再利用性向上、UIでの結果可視化など、まだまだトライしたい部分あるので引き続き時間見つけて勉強しよう

この記事が気に入ったらサポートをしてみませんか?