見出し画像

Apache Airflow初心者向けに箇条書きで簡単に説明する

特徴

  • Pythonで書かれているのでforループも自由だしpandasなどの外部モジュールも使えるしAWSなどパブリッククラウドとの連携も容易

  • 定期実行のスケジューリングも単発実行も可能だからバッチジョブでもML用途でもレポーティングにもデータバックアップにも使える

  • DAGを使ってタスク間の順序を自在に構築できる

from airflow.models import DAG  
from airflow.utils.dates import days_ago  
from airflow.operators.dummy_operator import DummyOperatorwith DAG(  
  "etl_sales_daily",  
  start_date=days_ago(1),  
  schedule_interval=None,  
) as dag:  
   task_a = DummyOperator(task_id="task_a")  
   task_b = DummyOperator(task_id="task_b")  
   task_c = DummyOperator(task_id="task_c")  
   task_d = DummyOperator(task_id="task_d")   task_a >> [task_b, task_c]  
   task_c >> task_d
上記DAGの結果

主要コンポーネント

  • オペレーターはロジックを関数で包んだもの

  • センサーはなにかの完了を待つ役割

  • フックは外部との連携

from airflow.models.baseoperator import BaseOperatorclass GCSCreateBucketOperator(BaseOperator):  
   def __init__(self, *, bucket_name: str, **kwargs):  
       super().__init__(**kwargs)  
       self.bucket_name = bucket_name   def execute(self, context):  
       hook = GCSHook()  
       hook.create_bucket(self.bucket_name)
  • XComはDAG内のタスク間での通信に使われるキーバリュー形式のテーブルだが、容量は48KBしかいないのでデータ自体を格納するのではなくデータを保存した場所を格納するイメージ

デプロイについて

  • 3つのコンポーネントがあってすべてairflowコマンドで操作できる

  • スケジューラーは司令塔でDAGファイルを解析してタスクのスケジューリングなどを行う

  • Webサーバーはパイプラインの状況をweb上で確認するためのインターフェースを提供すする

  • ワーカーは実際にタスクを実行するもの

  • 別にエグゼキューターという概念があり、ワーカーを単体で動かすのか、分散して動かすのか、Kubernetesを使うのか、デバッグ用に動かうのか、といったことを決定する

  • Airflowの始めるには既に用意されたDockerイメージを使うとよい

  • 特にKubernetes上で動かすならHelmチャートを利用するとよい

DAG配布をどうするか

  • スケジューラーとワーカーは両方ともDAGにアクセスできる必要があるので次の3つの方法のいずれかでAirflowデプロイ環境にDAGを配布しなければならない

  • 共有ファイルサーバーを使うと楽だが読み取りに時間がかかる恐れがある

  • git-syncなどを使うとローカルリポジトリとリモートリポジトリを同期でいるので良い

  • 他にもDockerイメージにDAGを焼き込む方法があるがDAG更新のたびにビルドし直しになる

この記事が参加している募集

仕事について話そう

よろしければサポートお願いします! いただいたサポートはクリエイターとしての活動費に使わせていただきます!