【Treasure Data】Workflowに関するコンポーネント(構成要素)まとめ|ProjectやAttempt など
こんにちは!noteをお読みいただきありがとうございます。
smkt事業部note運用チームです。
Treasure Data には、定期的に一連の処理を自動実行できる「Treasure Workflow」という機能があります
Treasure Workflowは、コンポーネント(構成要素)によって構成されます
今回は、Treasure Workflowに関するコンポーネントの概要や使用方法をご紹介します!
コンポーネントを理解することで、
「これから使い始める方」「より複雑な Workflowを作成しようとしている方」のお役に立てると思います
概要
記述する際には Digdag を使用し、各コンポーネントによって構成されます
※Digdag;定期的なバッチ処理を管理できるワークフローエンジン
ほとんど Digdag の話ですが、Project や Task など、Treasure Data Workflowで関わるコンポーネントをおさらいします
Project
Workflow
Session
Attempt
Task
Job
これらを理解することで Treasure Data の学習効率が上がるかと思います
本記事を書くにあたり、Project や Attempt などを総称して何と呼ぼうか悩みました
良さげな文言は無いかと検索すると「コンポーネント」と表現しているページがあり、そちらを採用しました
概念図
以下に各コンポーネントとその関係をまとめます
1.Project
Workflow の集合です
sql ファイルや py ファイル、yml ファイル等を含めることができます。ディレクトリ分けすることもできます
TD Toolbelt でダウンロード/アップロードする際は Project 毎に行います。その際、ルートディレクトリが Project を表します
バージョン管理(Revision)も Project 単位で行われます。Workflow や関連ファイルはバージョン管理されますが、シークレットは管理されません
シークレットは Project に紐づきます
2.Workflow
Project 直下の dig ファイルです
先の概念図の場合、workflow_1.dig および workflow_2.dig です
Project に 0 個以上紐づきます
Workflow の無い Project も作成できますが、TD Toolbelt では Warning が出ます(下記例)
$ td wf push wf_project
2022-05-23 20:35:38 +0900: Digdag v0.10.4
Creating .digdag\tmp\archive-5227463613052246120.tar.gz...
Archiving queries/query.sql
Workflows:
WARNING: This project doesn't include workflows. Usually, this is a mistake.
Please make sure that all *.dig files are on the top directory.
*.dig files in subdirectories are not recognized as workflows.
Uploaded:
id: (プロジェクトID)
name: wf_project
revision: (リビジョンID)
archive type: s3
project created at: 2022-05-23T09:58:55Z
revision updated at: 2022-05-23T11:35:40Z
Use `td workflow workflows` to show all workflows.
スケジューリングを行えます
紐づく Project のシークレット情報を参照できます
dig ファイル内で call>: オペレータを用いることで、同一の Project の別の dig ファイル(Workflow に限らない)を呼び出せます
dig ファイル内で require>: オペレータを用いることで、任意の Project の Workflow を呼び出せます
3.Session
Workflow の実行(仮想)です
Workflow に 0 個以上紐づきます
Workflow を作ったまま 1 回も実行しなければ 0 です
セッション時刻を指定でき、任意の時刻を設定できます
同一のセッション時刻には 1 回しか実行できません。その時刻に既にセッションがある場合は再実行しかできません
下記ログでは同 Workflow を同セッション時刻に 2 回実行しようとしていますが、2 回目は A session for the requested session_time already exists とエラーになっています
$ td wf start wf_project sample_workflow --session 2022-06-24
2022-05-23 19:17:10 +0900: Digdag v0.10.4
Started a session attempt:
session id: (セッションID)
attempt id: (アテンプトID)
uuid: (セッションUUID)
project: wf_project
workflow: sample_workflow
session time: 2022-06-24 00:00:00 +0000
retry attempt name:
params: {(省略)}
created at: 2022-05-23 19:17:11 +0900
* Use `td workflow session (セッションID)` to show session status.
* Use `td workflow task (アテンプトID)` and `td workflow log (アテンプトID)` to show task status and logs.
$ td wf start wf_project sample_workflow --session 2022-06-24
2022-05-23 19:17:51 +0900: Digdag v0.10.4
error: A session for the requested session_time already exists (session_id=(セッションID), attempt_id=(アテンプトID), session_time=2022-06-24T00:00Z)
hint: use `td workflow retry (アテンプトID) --latest-revision --all` command to run the session again for the same session_time
実行エラーになった際の Rerun は Session 単位で行います。この際、失敗した個所以降のみ実行することもできます
「Skip Previously Successful Tasks」 が選択されている場合、前回の Attempt で成功した Task はスキップされます
「All」が選択されている場合、前回成功した Task も再実行します(Session ID は同じままです)
「Revision」のドロップダウンから、実行する Revision を指定することもできます
hourly, daily, weekly, monthly でスケジューリングされた Session の場合、実際の実行時刻とセッション時刻が異なる場合があります
daily によって 毎日 AM7 時に実行される Workflow の場合、実際の実行時刻は AM7:00 ですが、セッション時刻は同日の 0:00 です
4.Attempt
Workflow の実行(実体)です
Session に 1 個以上紐づきます
ある Session で実行エラーとなり、その後 Rerun 機能を用いて再実行した場合、その Session には失敗した Attempt と再実行した際の Attempt の 2 つが紐づきます
下記例の場合、1 回目の Attempt(下)は失敗しましたが、2 回目の Attempt(上)は成功しています。1 回目と 2 回目の Attempt ID は異なります
Treasure Data コンソール上の「TIMELINE」タブや「WORKFLOW LOGS」タブ、「TASKS」タブに表示されるのは、最後に実行された Attempt のログです。そのため Rerun 機能を用いて再実行をすると、エラーとなった Attempt のログを GUI 上で見られなくなるので注意が必要です(本記事執筆時点のバージョン)
5.Task
Attempt に 0 個以上紐づきます
Task の無い Workflow は作成できますし、実行もできます
dig ファイル内で、+から始まるものが Task です。「+aaa:」のような形式です
子 Task、孫 Task... とネストできます
0 または 1 つのオペレータを持ちます。2 つ以上のオペレータは書けません(下記例)
# td>:オペレータとecho>:オペレータの両方を持つWorkflow
+multi_operator_task:
td>:
query: select * from information_schema.columns
echo>: aaa
> td wf check
2022-05-23 19:01:53 +0900: Digdag v0.10.4
System default timezone: Asia/Tokyo
error: A task can't have more than one operator: {"echo>":"aaa","td>":null,"query":"select * from information_schema.columns"} (config)
6.Job
Treasure Data 独自の概念です
Task が実行されると、0 個以上作成されます
Job の生成されない Task もあります
Result export を ON にした実行などは Job が 2 つ紐づいたりします
Query 画面で SQL を実行した際なども、Job が 1 つ生成されます
7.TD Toolbelt のコマンドから上記を確認する
上記を TD Toolbelt から確認してみたいと思います
TD Toolbelt がインストールされている環境で td wf と入力すると、ワークフローに関するコマンドのヘルプが表示されます
コマンドおよび引数から以下が言えるかと思います(強引かもしれません...)
workflows [project-name] [name]: workflow は project に依存する
sessions <project-name> <name>: session は project, workflow に依存する
attempts <session-id>: attempt は session に依存する
tasks <attempt-id>: task は attempt に依存する
$ td wf
2022-05-23 18:29:49 +0900: Digdag v0.10.4
Usage: td workflow <command> [options...]
Local-mode commands:
init <dir> create a new workflow project
r[un] <workflow.dig> run a workflow
c[heck] show workflow definitions
sched[uler] run a scheduler server
migrate (run|check) migrate database
selfupdate update cli to the latest version
Server-mode commands:
server start server
Client-mode commands:
push <project-name> create and upload a new revision
download <project-name> pull an uploaded revision
start <project-name> <name> start a new session attempt of a workflow
retry <attempt-id> retry a session
kill <attempt-id> kill a running session attempt
backfill <schedule-id> start sessions of a schedule for past times
backfill <project-name> <name> start sessions of a schedule for past times
reschedule <schedule-id> skip sessions of a schedule to a future time
reschedule <project-name> <name> skip sessions of a schedule to a future time
log <attempt-id> show logs of a session attempt
projects [name] show projects
workflows [project-name] [name] show registered workflow definitions
schedules show registered schedules
disable <schedule-id> disable a workflow schedule
disable <project-name> disable all workflow schedules in a project
disable <project-name> <name> disable a workflow schedule
enable <schedule-id> enable a workflow schedule
enable <project-name> enable all workflow schedules in a project
enable <project-name> <name> enable a workflow schedule
sessions show sessions for all workflows
sessions <project-name> show sessions for all workflows in a project
sessions <project-name> <name> show sessions for a workflow
session <session-id> show a single session
attempts show attempts for all sessions
attempts <session-id> show attempts for a session
attempt <attempt-id> show a single attempt
tasks <attempt-id> show tasks of a session attempt
delete <project-name> delete a project
secrets --project <project-name> manage secrets
version show client and server version
Options:
-L, --log PATH output log messages to a file (default: -)
-l, --log-level LEVEL log level (error, warn, info, debug or trace)
-X KEY=VALUE add a performance system config
--logback-config PATH path to logback configuration file (for developers only)
-c, --config PATH.properties Configuration file (default: C:\Users\ty329194\.config\digdag\config)
--version show client version
Client options:
-e, --endpoint URL Server endpoint (default: http://127.0.0.1:65432)
-H, --header KEY=VALUE Additional headers
--disable-version-check Disable server version check
--disable-cert-validation Disable certificate verification
--basic-auth <user:pass> Add an Authorization header with the provided username and password
Use `<command> --help` to see detailed usage of a command.
Digdag のコマンドリファレンスや、API リファレンスを見ても面白いかもしれません
8.最後に
Treasure Data を導入したら、ほぼ間違いなく Workflow がスケジューリングされます
どんなに長く、複雑な Workflow も基本は同じです
この記事が Treasure Data の学習の一助になれば幸いです
●●●
パーソルP&Tでは、CDP導入・活用支援サービスに注力しています。
●●●
●●●
またSMKT事業部では データエンジニアを募集しています。
ご興味をお持ちの方はこちらからご応募ください!
▽その他募集職種こちらから▽
●●●
マガジン「Treasure Data Tips集」では、現場で役立つTipsをまとめています。是非こちらもご覧ください。