見出し画像

BigQueryのテーブルの値でAirflowの実行タスクを分岐する

電通デジタルでBIエンジニアをしている三瓶です。
普段は社内向け広告運用改善ダッシュボードのデータエンジニアリング周りを担当しています。

本記事では、AirflowのOperatorを使ってBigQueryのテーブルの値によって実行するタスクを分岐する方法についてご紹介します。

 Airflowの実行タスクを分岐するに至った理由

弊社では主にワークフローエンジンのAirflow[1]を用いて取得したデータのETL関連タスクをワークフロー(DAG)に乗せて実行しています。

画像①

しかし、上の管理画面の画像のように多種多様なDAGを常時複数運用をしていると、出力結果の品質チェックを毎回行うのはかなりのコストがかかり日々コストの効率化に勤しんでいます。

そこで考案したのがBigQuery上の出力結果を取得し、品質的に異常があった場合はその内容をSlackへ通知し、そうでない場合は通常通りDAGを完了するような分岐処理を行うDAGです。

実行タスクを分岐させる方法はいくつか考えられますが、今回はその中でも事前に用意されているAirflow Operatorの中のBigQueryGetDataOperator、BranchPythonOperatorを使った比較的実装が容易な手法をご紹介します。

BigQueryGetDataOperatorを使ってテーブルの値を取得する

BigQueryGetDataOperatorはBigQueryのテーブルからデータを取得し、Pythonのリスト形式で返すOperatorです。今回のように分岐の条件にBigQueryのテーブルの値を使う場合はこのOperatorを使用することが考えられます。

以下のサンプルコードは、指定したデータセット(test_dataset)のテーブル(test_table)からmediaとdataカラムの値を取得しています。

get_data_from_bq = BigQueryGetDataOperator(
 task_id = 'get_data_from_bq',
 dataset_id = 'test_dataset',
 table_id = 'test_table',
 max_result = 100,
 selected_fields = 'media,date',
 provide_context = True,
 dag = dag,
 )

BranchPythonOperatorで実行タスクを分岐する

BranchPythonOperatorはPythonにより後続に実行されるOperatorを戻り値として定義し、その分岐処理をAirflow上で実行するためのOperatorです。実際の分岐させるための詳細な条件は関数内で定義することが可能です。

以下のサンプルコードは先ほどのBigQueryGetDataOperatorで取得したmediaとdataの値で、 2021-3-23より古い日付を持つ媒体が1つもない場合は通常タスク(task_for_normal)を実行し、それ以外は異常タスク(task_for_abnormal)を実行するよう分岐させています。

#BigQueryGetDataOperatorで取得したリストから, 2021-3-23より古い日付を持つ媒体リストを取得する関数
def extract(**kwargs):
 ti = kwargs['ti']
 bq_data = ti.xcom_pull(task_ids = 'get_data_from_bq')
 df = pd.DataFrame(bq_data)
 df = df.rename(columns={0: 'media', 1: 'date'})
 df['date'] = pd.to_datetime(df['date'])
 df = df[df['date'] < dt.datetime(2021,3,23)]
 media = df['media'].values.tolist()
 return media

#上記で取得した媒体リストが空の場合は正常、それ以外は異常タスク名を返す関数
def branch(**kwargs):
 ti = kwargs['ti']
 media = ti.xcom_pull(key='return_value' , task_ids='extract')
 if len(media) == 0:
   return 'task_for_normal'
 else:
   return 'task_for_abnormal'

実装結果

では、実際に上記のサンプルコードを使って実行タスクの分岐を行ってみます。
全体のコード例として以下のように、条件は先ほどと同様に2021-3-23より古い日付(date)を持つ媒体(media)が1つもない場合は通常タスク(task_for_normal)、それ以外は異常タスク(task_for_abnormal)を実行するようbranch関数で定義しています。


import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_get_data import BigQueryGetDataOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import timedelta
import datetime as dt
from airflow.models import Variable
import pandas as pd
import json
import requests

# Slackへ通知する関数に必要な変数
webhook_url = Variable.get('webhook_url')
webhook_name = Variable.get('webhook_name')
channel_name = Variable.get('channel_name')

# Slackへ通知する関数
def slack_alert(kwargs):
   dag_name = 'test'
   ti = kwargs['ti']
   media = ti.xcom_pull(key='return_value' , task_ids='extract')
   data = {
           'username': webhook_name,
           'channel': channel_name,
           'attachments': [{
               'fallback': f'test',
               'color': '#e53935',
               'title': f'test',
               'text': f' Updates are failing on {media}...'
               }]
           }
   requests.post(webhook_url, json.dumps(data))

default_args = {
               'owner': 'test',
               'start_date':  dt.datetime(2021,1,1),
               'depends_on_past':True,
               'retries': 1,
               'provide_context':True,
               'retry_delay': timedelta(minutes=1),
               }
dag = DAG(
   dag_id = 'test',
   default_args = default_args,
   description = 'test',
   schedule_interval = '@once',
   catchup = False,
   dagrun_timeout = timedelta(minutes=60)
   )

#BigQueryGetDataOperatorで取得したリストから, 2021-3-23より古い日付を持つ媒体リストを取得する関数
def extract(**kwargs):
 ti = kwargs['ti']
 bq_data = ti.xcom_pull(task_ids = 'get_data_from_bq')
 df = pd.DataFrame(bq_data)
 df = df.rename(columns = {0: 'media', 1: 'date'})
 df['date'] = pd.to_datetime(df['date'])
 df = df[df['date'] < dt.datetime(2021,3,23)]
 media = df['media'].values.tolist()
 return media

#extractで取得した媒体リストが空の場合は正常、それ以外は異常タスク名を返す関数
def branch(**kwargs):
 ti = kwargs['ti']
 media = ti.xcom_pull(key='return_value' , task_ids='extract')
 if len(media) == 0:
   return 'task_for_normal'
 else:
   return 'task_for_abnormal'

#始点となるタスク
start = DummyOperator(task_id='start',dag = dag,)

#BigQueryGetDataOperatorでBigQueryのテーブルからデータを取得するタスク
get_data_from_bq = BigQueryGetDataOperator(
 task_id = 'get_data_from_bq',
 dataset_id = 'test_dataset',
 table_id = 'test_table',
 max_results = 100,
 selected_fields = 'media,date',
 dag = dag,
 )

#extract関数を実行するタスク
extract = PythonOperator(
 task_id = 'extract',
 python_callable = extract,
 dag = dag,
 )

#branch関数で返した後続に実行されるOperatorを実行するタスク
branch_operator = BranchPythonOperator(
 task_id = 'branch_operator',
 python_callable = branch,
 dag = dag,
 )

#正常用タスク
task_for_normal = DummyOperator(
 task_id ='task_for_normal',
 dag = dag,
 )

#異常用タスク
task_for_abnormal = DummyOperator(
 task_id ='task_for_abnormal',
 on_success_callback = slack_alert, #Slackへ通知する関数を呼び出す
 dag = dag,
 )

start >> get_data_from_bq >> extract >> branch_operator >> [task_for_normal ,task_for_abnormal]

また、今回は正常時と異常時両方の結果を比較するため、意図的に分岐の条件となる日付の値を変えてそれぞれ実行してみます。

画像③

画像②

上の画像は異常タスクへと分岐させたいため、test_tableの任意の媒体の日付を意図的に2020-12-31に書き換えた上で実行した結果です。BranchPythonOperatorで正しく異常を検知し分岐が行われ、Slack通知タスクが実行されていることがわかります。

画像④

上の画像は今度は正常タスクへと分岐させたいため、test_tableの全ての媒体の日付を正常な2021-03-23に修正し実行した結果です。こちらも同様にBranchPythonOperatorで正しく分岐が行われ、正常にワークフローが完了されていることがわかります。

おわりに

本記事ではAirflowにおける実行タスクの分岐方法について簡単に紹介しました。今回紹介したBigQueryGetDataOperatorとBranchPythonOperatorの組み合わせは使い方自体がシンプルにで、かつユーザー側で分岐条件を柔軟に変更できるため様々なケースで利用することができるためとても使い勝手が良い手法だと感じています。

様々なデータソースを参照して可視化するBI領域の業務では、データの品質管理は重要ながらもとてもコストがかかる業務です。本記事が同じ様な悩みを抱えておられる読者の開発を助けてくれる一手法となれば幸いです。

参考資料
[1] https://airflow.apache.org/

みんなにも読んでほしいですか?

オススメした記事はフォロワーのタイムラインに表示されます!