Dataproc サーバレスでBigQueryのデータを加工するクイックスタート

概要

PySparkを使用してBigQueryのデータを加工するバッチ処理をDataproc サーバレスで動かしたので、その時の体験記を残す。
以下の記事を実行するうえで同記事内では直接書かれていないことも含めて記載する。

事前準備

VPCサブネットの作成

私はDataproc用のVPCネットワークを作成して、そこにサブネットを作成する方法を取りました。(defaultをいじりたくなかったため)
ネットワーク設定の具体的な方法は以下の記事をまねしました。

Cloud Storage バケットの作成

pysparkのコード内で使用するGCSバケットを準備する。
pysparkのコードは上記の「BigQuery コネクタを Dataproc Serverless for Spark とともに使用する」記事内のコードを適宜書き換えてそのまま使用する。

BigQueryデータセットの作成

Cloud Storageのバケットと同様にコード内で使用するBigQueryのデータセットを準備する。

sparkジョブをsubmitする

基本コマンド

ここはいくつかはまりポイントがあったので解決策含め記載する。
gcloudコマンドでジョブをsubmitする。基本コマンドは上記の記事内の通り以下。

gcloud dataproc batches submit pyspark wordcount.py\
    --region=region \
    --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-version.jar \
    ... other args

はまりポイント1:windows環境でローカルのソースを使用してsubmitする場合

まずローカルのpythonファイルを使用する場合windowsだと私の環境ではうまくできなかった。エラー原因は以下の通りで、GCSオブジェクトに\が入ってしまうのが問題のよう。windows環境だとダメのようなので、linux環境を使用する(docker, WSL, Cloud Shellなど)を使用するとよい。
試してないけど、GCS上にソースコードをアップロードしてそれを指定すればたぶんwindows環境でもこの問題を回避できる気がする。

java.lang.IllegalArgumentException: Illegal character in path at index 55: gs://[bucket-name]/dependencies\sample_spark.py

はまりポイント2:BigQueryコネクタを使用する場合のClassNotFoundException

ジョブのsubmit時にsparkランタイムのバージョンを指定しないと、デフォルトのバージョンを使用する仕様だが、そのバージョンとBQコネクタのjarファイルのバージョンを合わせる必要があるらしい。
参考にしたのは以下の記事。

submit時にランタイムを指定して、jarファイルのバージョンをそれに合わせたものにするのが安全そう。この問題はsparkランタイムのversion2がリリースされた2023年1月以降に発生するようになったみたいなので、少し古い記事だとコマンドでランタイム指定を明示的にしているものはあまりなさそう。

完成コマンド

上記をふまえて以下のコマンドを実行してsparkジョブをsubmitした。

gcloud --project [project-name] dataproc batches submit pyspark [python-file-name] \
--version=2.0 --batch=sample-spark --region=[region] \
--deps-bucket=[bucket-name] \
--subnet [subnet-name] \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.29.0.jar


この記事が気に入ったらサポートをしてみませんか?