見出し画像

【Treasure Data】Workflowに関するコンポーネント(構成要素)まとめ|ProjectやAttempt など

こんにちは!noteをお読みいただきありがとうございます。
smkt事業部note運用チームです。

Treasure Data には、定期的に一連の処理を自動実行できる「Treasure Workflow」という機能があります
Treasure Workflowは、コンポーネント(構成要素)によって構成されます

今回は、Treasure Workflowに関するコンポーネントの概要や使用方法をご紹介します!

コンポーネントを理解することで、
「これから使い始める方」「より複雑な Workflowを作成しようとしている方」のお役に立てると思います

概要

  • 記述する際には Digdag を使用し、各コンポーネントによって構成されます
    ※Digdag;定期的なバッチ処理を管理できるワークフローエンジン

  • ほとんど Digdag の話ですが、Project や Task など、Treasure Data Workflowで関わるコンポーネントをおさらいします

    • Project

    • Workflow

    • Session

    • Attempt

    • Task

    • Job

  • これらを理解することで Treasure Data の学習効率が上がるかと思います

  • 本記事を書くにあたり、Project や Attempt などを総称して何と呼ぼうか悩みました

  • 良さげな文言は無いかと検索すると「コンポーネント」と表現しているページがあり、そちらを採用しました

概念図

  • 以下に各コンポーネントとその関係をまとめます

1.Project

  • Workflow の集合です

  • sql ファイルや py ファイル、yml ファイル等を含めることができます。ディレクトリ分けすることもできます

  • TD Toolbelt でダウンロード/アップロードする際は Project 毎に行います。その際、ルートディレクトリが Project を表します

  • バージョン管理(Revision)も Project 単位で行われます。Workflow や関連ファイルはバージョン管理されますが、シークレットは管理されません

  • シークレットは Project に紐づきます

2.Workflow

  • Project 直下の dig ファイルです

    • 先の概念図の場合、workflow_1.dig および workflow_2.dig です

  • Project に 0 個以上紐づきます

    • Workflow の無い Project も作成できますが、TD Toolbelt では Warning が出ます(下記例)

$ td wf push wf_project
2022-05-23 20:35:38 +0900: Digdag v0.10.4
Creating .digdag\tmp\archive-5227463613052246120.tar.gz...
  Archiving queries/query.sql
Workflows:
  WARNING: This project doesn't include workflows. Usually, this is a mistake.
           Please make sure that all *.dig files are on the top directory.
           *.dig files in subdirectories are not recognized as workflows.

Uploaded:
  id: (プロジェクトID)
  name: wf_project
  revision: (リビジョンID)
  archive type: s3
  project created at: 2022-05-23T09:58:55Z
  revision updated at: 2022-05-23T11:35:40Z

Use `td workflow workflows` to show all workflows.
  • スケジューリングを行えます

  • 紐づく Project のシークレット情報を参照できます

  • dig ファイル内で call>: オペレータを用いることで、同一の Project の別の dig ファイル(Workflow に限らない)を呼び出せます

  • dig ファイル内で require>: オペレータを用いることで、任意の Project の Workflow を呼び出せます

3.Session

  • Workflow の実行(仮想)です

  • Workflow に 0 個以上紐づきます

    • Workflow を作ったまま 1 回も実行しなければ 0 です

  • セッション時刻を指定でき、任意の時刻を設定できます

  • 同一のセッション時刻には 1 回しか実行できません。その時刻に既にセッションがある場合は再実行しかできません

    • 下記ログでは同 Workflow を同セッション時刻に 2 回実行しようとしていますが、2 回目は A session for the requested session_time already exists とエラーになっています

$ td wf start wf_project sample_workflow --session 2022-06-24
2022-05-23 19:17:10 +0900: Digdag v0.10.4
Started a session attempt:
  session id: (セッションID)
  attempt id: (アテンプトID)
  uuid: (セッションUUID)
  project: wf_project
  workflow: sample_workflow
  session time: 2022-06-24 00:00:00 +0000
  retry attempt name:
  params: {(省略)}
  created at: 2022-05-23 19:17:11 +0900

* Use `td workflow session (セッションID)` to show session status.
* Use `td workflow task (アテンプトID)` and `td workflow log (アテンプトID)` to show task status and logs.

$ td wf start wf_project sample_workflow --session 2022-06-24
2022-05-23 19:17:51 +0900: Digdag v0.10.4
error: A session for the requested session_time already exists (session_id=(セッションID), attempt_id=(アテンプトID), session_time=2022-06-24T00:00Z)
hint: use `td workflow retry (アテンプトID) --latest-revision --all` command to run the session again for the same session_time
  • 実行エラーになった際の Rerun は Session 単位で行います。この際、失敗した個所以降のみ実行することもできます

    • 「Skip Previously Successful Tasks」 が選択されている場合、前回の Attempt で成功した Task はスキップされます

    • 「All」が選択されている場合、前回成功した Task も再実行します(Session ID は同じままです)

    • 「Revision」のドロップダウンから、実行する Revision を指定することもできます

  • hourly, daily, weekly, monthly でスケジューリングされた Session の場合、実際の実行時刻とセッション時刻が異なる場合があります

4.Attempt

  • Workflow の実行(実体)です

  • Session に 1 個以上紐づきます

  • ある Session で実行エラーとなり、その後 Rerun 機能を用いて再実行した場合、その Session には失敗した Attempt と再実行した際の Attempt の 2 つが紐づきます

    • 下記例の場合、1 回目の Attempt(下)は失敗しましたが、2 回目の Attempt(上)は成功しています。1 回目と 2 回目の Attempt ID は異なります

  • Treasure Data コンソール上の「TIMELINE」タブや「WORKFLOW LOGS」タブ、「TASKS」タブに表示されるのは、最後に実行された Attempt のログです。そのため Rerun 機能を用いて再実行をすると、エラーとなった Attempt のログを GUI 上で見られなくなるので注意が必要です(本記事執筆時点のバージョン)

5.Task

  • Attempt に 0 個以上紐づきます

    • Task の無い Workflow は作成できますし、実行もできます

  • dig ファイル内で、+から始まるものが Task です。「+aaa:」のような形式です

  • 子 Task、孫 Task... とネストできます

  • 0 または 1 つのオペレータを持ちます。2 つ以上のオペレータは書けません(下記例)

# td>:オペレータとecho>:オペレータの両方を持つWorkflow
+multi_operator_task:
  td>:
  query: select * from information_schema.columns
  echo>: aaa
> td wf check
2022-05-23 19:01:53 +0900: Digdag v0.10.4
  System default timezone: Asia/Tokyo

error: A task can't have more than one operator: {"echo>":"aaa","td>":null,"query":"select * from information_schema.columns"} (config)

6.Job

  • Treasure Data 独自の概念です

  • Task が実行されると、0 個以上作成されます

    • Job の生成されない Task もあります

    • Result export を ON にした実行などは Job が 2 つ紐づいたりします

  • Query 画面で SQL を実行した際なども、Job が 1 つ生成されます

7.TD Toolbelt のコマンドから上記を確認する

  • 上記を TD Toolbelt から確認してみたいと思います

  • TD Toolbelt がインストールされている環境で td wf と入力すると、ワークフローに関するコマンドのヘルプが表示されます

  • コマンドおよび引数から以下が言えるかと思います(強引かもしれません...)

    • workflows [project-name] [name]: workflow は project に依存する

    • sessions <project-name> <name>: session は project, workflow に依存する

    • attempts <session-id>: attempt は session に依存する

    • tasks <attempt-id>: task は attempt に依存する

$ td wf
2022-05-23 18:29:49 +0900: Digdag v0.10.4
Usage: td workflow <command> [options...]
  Local-mode commands:
    init <dir>                         create a new workflow project
    r[un] <workflow.dig>               run a workflow
    c[heck]                            show workflow definitions
    sched[uler]                        run a scheduler server
    migrate (run|check)                migrate database
    selfupdate                         update cli to the latest version

  Server-mode commands:
    server                             start server

  Client-mode commands:
    push <project-name>                create and upload a new revision
    download <project-name>            pull an uploaded revision
    start <project-name> <name>        start a new session attempt of a workflow
    retry <attempt-id>                 retry a session
    kill <attempt-id>                  kill a running session attempt
    backfill <schedule-id>             start sessions of a schedule for past times
    backfill <project-name> <name>     start sessions of a schedule for past times
    reschedule <schedule-id>           skip sessions of a schedule to a future time
    reschedule <project-name> <name>   skip sessions of a schedule to a future time
    log <attempt-id>                   show logs of a session attempt
    projects [name]                    show projects
    workflows [project-name] [name]    show registered workflow definitions
    schedules                          show registered schedules
    disable <schedule-id>              disable a workflow schedule
    disable <project-name>             disable all workflow schedules in a project
    disable <project-name> <name>      disable a workflow schedule
    enable <schedule-id>               enable a workflow schedule
    enable <project-name>              enable all workflow schedules in a project
    enable <project-name> <name>       enable a workflow schedule
    sessions                           show sessions for all workflows
    sessions <project-name>            show sessions for all workflows in a project
    sessions <project-name> <name>     show sessions for a workflow
    session  <session-id>              show a single session
    attempts                           show attempts for all sessions
    attempts <session-id>              show attempts for a session
    attempt  <attempt-id>              show a single attempt
    tasks <attempt-id>                 show tasks of a session attempt
    delete <project-name>              delete a project
    secrets --project <project-name>   manage secrets
    version                            show client and server version

  Options:
    -L, --log PATH                   output log messages to a file (default: -)
    -l, --log-level LEVEL            log level (error, warn, info, debug or trace)
    -X KEY=VALUE                     add a performance system config
    --logback-config PATH            path to logback configuration file (for developers only)
    -c, --config PATH.properties     Configuration file (default: C:\Users\ty329194\.config\digdag\config)
    --version                        show client version

  Client options:
    -e, --endpoint URL               Server endpoint (default: http://127.0.0.1:65432)
    -H, --header  KEY=VALUE          Additional headers
    --disable-version-check          Disable server version check
    --disable-cert-validation        Disable certificate verification
    --basic-auth <user:pass>         Add an Authorization header with the provided username and password

Use `<command> --help` to see detailed usage of a command.

8.最後に

  • Treasure Data を導入したら、ほぼ間違いなく Workflow がスケジューリングされます

  • どんなに長く、複雑な Workflow も基本は同じです

  • この記事が Treasure Data の学習の一助になれば幸いです

●●●

パーソルP&Tでは、CDP導入・活用支援サービスに注力しています。

●●●

✉ サービスに関するお問い合わせ
パーソルプロセス&テクノロジー SMKT事業部
smkt_markegr_note@persol-pt.co.jp

●●●

またSMKT事業部では データエンジニアを募集しています。
ご興味をお持ちの方はこちらからご応募ください!

▽その他募集職種こちらから▽

●●●

マガジン「Treasure Data Tips集」では、現場で役立つTipsをまとめています。是非こちらもご覧ください。