見出し画像

MWAA で解析ワークフローを構築する際に ECS Fargate が必要になる場合と動的にタスク分割して実行する方法について

概要

株式会社サイキンソーの翠川優希(Twitter ID: @Sango6mdr)です。
弊社では事業の一つに人や動物の腸内環境をはじめとした検査サービスを提供しています。その中の一部の菌叢解析サービスの自動化を Airflow(MWAA)を使って実装しました。実装の過程で MWAA の仕様の関係で ECS Fargate が必要になったり、解析時にタスクを分割する必要があったのでその方法いくつかを試行錯誤したので Tips としてご紹介します。

Airflow とは

スクリーンショット 2021-10-23 16.40.37

Airflow is a platform to programmatically author, schedule and monitor workflows.
(和訳:Airflowは、ワークフローをプログラムでオーサリング、スケジューリング、モニタリングするためのプラットフォームです。)
引用:https://airflow.apache.org/docs/apache-airflow/stable/index.html

Airflow は DAG(Directed Acyclic Graph; 有向非巡回グラフ)を使ってワークフローを表現できるのが特徴です。
DAG 内で依存関係をコードで記載し、その依存関係に従ってタスクを worker 上で実行することができます。
設定した依存関係と実行結果は Web UI 上で確認することができるため直感的に操作できます。

Airflow のマネージドサービスの MWAA について

Amazon Managed Workflows for Apache Airflow (MWAA) は、Apache Airflow のマネージドオーケストレーションサービスであり、クラウドでエンドツーエンドのデータパイプラインの設定と運用を大規模かつ簡単に行えるようにします。
引用:https://aws.amazon.com/jp/managed-workflows-for-apache-airflow/

AWS のマネージドな環境で Airflow を使用できるため、サーバーの管理などが必要ないのが利点です。

Tips 1: disk size 不足の場合はECS Fargate を使用する

解析時に S3 からデータをダウンロードして MWAA の worker 内で解析を実行しようとしましたが、disk size 不足で失敗しました。worker の disk size は Amazon ECS Fargate 1.3 の仕様により、タスクの総保存容量の上限は10GBとなっており、テクニカルサポートにも問い合わせましたが、現在は設定変更できない箇所でした。

Total task storage is limited to 10 GB, according to Amazon ECS Fargate 1.3. 
引用:https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-faqs.html

最初の構成案: MWAA でデータ処理を完結させる

スクリーンショット 2021-11-02 16.30.24

改善案: 重たい処理は ECS Fargate を使用する

スクリーンショット 2021-11-02 16.30.33

* FASTQ データは DNA などの塩基配列をデータのクオリティ情報とともに保存したテキストデータです。

データの検出とタスク分割までを MWAA で行い、実際の解析タスクは ECS Fargate 上で行うことで対応しました。

Tips 2: MWAA (Airflow v2.0.2) で動的にタスクを分割して実行する際は trigger_dag API がおすすめ

菌叢解析のサンプル数が多くなると大量のメモリが必要となるため、バッチごとのサンプル数に応じて動的に分割する数を制御してワークフローを実行する必要があります。

現在 MWAA の最新バージョン(Airflow v2.0.2)で使用できるタスク分割の方法には以下の 3 種類があります。

・SubDagOperator
・TaskGroup
・trigger_dag API

SubDagOperator

スクリーンショット 2021-10-29 16.37.09

Note: Astronomer highly recommends avoiding SubDAGs if the intended use of the SubDAG is to simply group tasks within a DAG's Graph View. Airflow 2.0 introduces Task Groups which is a UI grouping concept that satisfies this purpose without the performance and functional issues of SubDAGs. While the SubDagOperator will continue to be supported, Task Groups are intended to replace it long-term.
(和訳:DAGのグラフビューでタスクをグループ化することが目的であれば、SubDAGを避けることを強くお勧めします。Airflow 2.0では、タスクグループが導入されました。これは、SubDAGのパフォーマンスや機能的な問題なしに、この目的を満たすUIのグループ化コンセプトです。SubDagOperatorは今後もサポートされますが、タスクグループは長期的にそれを置き換えることを目的としています。)
引用:https://www.astronomer.io/guides/subdags

SubDagOperator はタスクをグループ化させる方法で、この機能を利用して必要な数だけタスクを実行できそうでしたが、複数の記事で使用を推奨されていませんでした(便利そうなのに残念)。

さらにこちらの記事では、「SequentialExecutor以外(CeleryExecutorやLocalExecutor)を使うのは危険」としており、並列実行時に不安定になるのは困ってしまうので使用を断念しました。

その他の SubDagOperator を非推奨とする記事
Cloud Composer
Astonomer

公式のサンプルコードはこちら

SubDagOperator を使った方がいいパターン

・Airflow 2.0 以降を使用できないとき
・タスクの並列実行が必要ないとき

Airflow 2.0 以降ではほぼ上位互換の TaskGroup を使えるため選択肢に上がらないかなと思いました。どうしても Airflow 2.0 以降が使えない場合は前述の注意点があるため、並列実行しない場合のみフィットします。

TaskGroup

スクリーンショット 2021-10-29 16.22.55

Unlike SubDAGs, Task Groups are just a UI grouping concept. Starting in Airflow 2.0, you can use Task Groups to organize tasks within your DAG's graph view in the Airflow UI. This avoids the added complexity and performance issues of SubDAGs, all while using less code!
(和訳:TaskGroupは、SubDAGとは異なり、単なるUIのグループ化の概念です。Airflow 2.0からは、タスクグループを使用して、Airflow UIのDAGのグラフビュー内のタスクを整理することができます。これにより、SubDAG の複雑さやパフォーマンスの問題を回避し、より少ないコードを使用することができます。)
引用:https://www.astronomer.io/guides/task-groups

TaskGroup は Airflow 2.0 から実装された機能で、タスクをグルーピングしたり、分割して並列実行したりできます。
UI 上も一目でタスクの状態が判別できとても見やすいです。しかし、あらかじめ決まった数のタスクをグループにするのは得意ですが、今回のように動的にタスク分割の数を変化させたい場合は不向きなようでした(なんとか使えないか色々試したが上手く行かず。。)。
公式のサンプルコードはこちら

TaskGroup を使った方がいいパターン

・タスクの分割数が一定のとき
・グループ化したいタスクが多いとき

まず、動的にタスク分割数を変化させる場合以外は TaskGroup が非常に使いやすいですね。さらに、タスクごとの塊や依存関係を簡単に書くことができ、UI でひと目で依存関係やグループ化されたタスクを確認できるので複雑な依存関係を書きたい場合はフィットするかな〜と思います。

trigger_dag API

スクリーンショット 2021-10-29 17.54.59

trigger_dag は「DAG A」 を実行する中で外部の 「DAG B」を実行するための API です。
一つの DAG で完結しないため、UI が一画面で完結しない点がいまいちポイントですが、DAG が疎結合な分、必要な回数だけ「DAG B」を実行するだけなのでとてもシンプルになりました。
実装はこちらの記事を参考にさせていただきました。
公式のサンプルコードはこちら(トリガーする側される側

trigger_dag API を使った方がいいパターン

・動的にタスク分割数を変化させたいとき

今回の事例のような、動的にタスク分割数を変化させるタスクを実装したい場合は trigger_dag API がフィット しました! 

一点、注意点としては DAG が分かれる(一つの DAG で完結しない)ため、UI 上で一度に確認できません。

まとめ

MWAA でワークフローを実装してみてつまづいた点を 2 つの Tips にまとめました。実装する中で、色々調査をしながら取り組んできましたが、日本語の情報がまだそこまで多くありませんでした。他に似たような問題に取り組んでいる人はいると思うので参考になれば嬉しいです!

最後に

株式会社サイキンソーでは人材を募集しています。
現在株式会社サイキンソーでは「細菌叢で人々を健康に」というミッションを共に実現する仲間を募集しています。お気軽にご連絡ください。
募集ページはこちら:https://cykinso.co.jp/recruit
いきなり募集ページから連絡するのがハードル高い場合は私の Twitter アカウントに DM でご連絡いただいても大丈夫です!
お気軽にご連絡下さい〜!

参考

https://airflow.apache.org/docs/apache-airflow/stable/index.html
https://aws.amazon.com/jp/managed-workflows-for-apache-airflow/
https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-faqs.html
https://cloud.google.com/composer/docs/how-to/using/writing-dags?hl=ja#creating_dags
https://www.astronomer.io/guides/subdags
https://qiita.com/notrogue/items/6102db88080897f4f606
https://dev.classmethod.jp/articles/airflow-dynamic-workflow/

この記事が気に入ったらサポートをしてみませんか?