見出し画像

AirflowのDAGの実行状況をSlackに通知して監視する

電通デジタルでデータサイエンティストを務めている吉田です。 普段はウェブログデータの分析や予測モデルの構築、ワークフローの実装などを担当しています。

本記事ではAirflowのDAGの実行状況をSlackに通知してモニタリングする際の実装例とTipsについて紹介させていただきます。

ワークフローモニタリングの課題

弊社では主にワークフローエンジンのAirflowを用いてデータのインポート/エクスポートやデータ加工、モデルによる推論などの定型的な連続したタスクをワークフロー(DAG)に乗せて運用しています。

画像1

上の管理画面の画像のように日次や数時間おきに実行するDAGを常時複数運用をしていると、成否など実行状況を毎回管理画面に確認しに行くのは少々手間になります。

画像2

そこで、外部からSlackチャンネルに投稿ができるIncoming Webhookを用いてDAGの実行状況を以下の画像のようなメッセージとしてSlackに通知するような実装をしてみました。

実装

環境: Cloud Composer Airflow-1.10.2

Slackのメニューの App 管理 > カスタムインテグレーション > Incoming WebhookでIncoming Webhookアプリを作成し、webhook URLを生成しておきます。

AirflowのOperatorのパラメータに on_success_callback または on_failure_callback を設定しておくとタスクの成功/失敗時に指定した関数を実行することができます。 ここにタスクの成功/失敗時にそれをSlackに通知する関数を指定しておきます。callbackされる関数にはタスクインスタンスのステータスが引数として渡されます。

以下はタスクが失敗した際にSlackに通知する関数の例です。 引数statusから正規表現でDAG IDとタスクIDを取得しています。Incoming Webhookの仕様に従って、通知するメッセージの内容を指定します。

def failured(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    task_name = re.findall(r'.*\:\s(.*)\>', str(status['task']))[0]
    data = {
            'username': webhook_name,
            'channel': channel_name,
            'attachments': [{
                'fallback': f'{dag_name}:{task_name}',
                'color': '#e53935',
                'title': f'{dag_name}:{task_name}',
                'text': f'{task_name} was failed...'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

全体のコード例として以下のように3つのタスク(op1, op2, op3)を順番に実行するDAGを用意しました。 webhook URLや通知するSlackチャンネル名などはAirflowのVariablesに文字列として格納し、 Variable.get でキーを指定して引き出すことで、変更や他の環境への転用がしやすいようにしています。 いずれかのタスクが失敗した時点で"失敗"を通知、全タスクが成功した時点で"成功"を通知としたいので、 on_failure_callbackdefault_args に指定、 on_success_callback は最後のタスクのみに指定しています。また、成功の際は青系統、失敗の際は赤系統の色のメッセージにすることで状況を視覚的に分かりやすくしています。

# coding: utf-8
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
import re
import json
import requests

# webhook URLやチャンネル名などを指定
webhook_url = Variable.get('slack_webhook_url')
webhook_name = Variable.get('slack_webhook_name')
channel_name = Variable.get('slack_channel_name')

# タスク失敗を通知する関数
def failured(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    task_name = re.findall(r'.*\:\s(.*)\>', str(status['task']))[0]
    data = {
            'username': webhook_name,
            'channel': channel_name,
            'attachments': [{
                'fallback': f'{dag_name}:{task_name}',
                'color': '#e53935',
                'title': f'{dag_name}:{task_name}',
                'text': f'{task_name} was failed...'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

# タスク成功を通知する関数
def successed(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    data = {
            'username': webhook_name,
            'channel': channel_name,
            'attachments': [{
                'fallback': dag_name,
                'color': '#1e88e5',
                'title': dag_name,
                'text': f'{dag_name} was successed!'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

default_args = {
        'owner': 'airflow',
        'retries': 0,
        'start_date': airflow.utils.dates.days_ago(1),
        'on_failure_callback': failured # すべてのタスクについて失敗時に通知する
        }

with DAG(
        dag_id='DAG_ID',
        default_args=default_args,
        ) as dag:

    def function1():
        ...
    def function2():
        ...
    def function3():
        ...

    op1 = PythonOperator(
            task_id='op1',
            python_callable=function1,
            dag=dag
            )
    op2 = PythonOperator(
            task_id='op2',
            python_callable=function2,
            dag=dag
            )
    op3 = PythonOperator(
            task_id='op3',
            python_callable=function3,
            on_success_callback=successed, # 最後のタスクの成功を通知する
            dag=dag
            )

    op1 >> op2 >> op3

if __name__ == '__main__':
    dag.cli()

DAGの成否に応じて冒頭の画像のようなメッセージが指定したチャンネルに投稿されます。

上記のようなDAGの実行状況の通知以外にもSlack通知が役に立つ場面があります。タスクの中の処理内容の成否がDAGの実行の成否に依らない場合です。

例えば、DAG内のあるタスクでHTTPリクエストを実行する場合、HTTPリクエストの成否を問わず処理自体はエラーなく通るのでタスク自体は成功になります。実際の処理の実行状況を細かく確認したい場合、ログを確認しなければならないのでこれもかなり手間です。このような場合に、タスク内の処理自体の結果をSlackに通知するといったことも可能です。

Slack Incoming Webhookは作成も実装も容易なので、社内のコミュニケーションツールとしてSlackを利用している場合、ワークフローモニタリングを行うツールとしてベストだと感じています。

参考資料

[1] https://airflow.apache.org/docs/stable/
[2] https://api.slack.com/messaging/composing/layouts

3