見出し画像

Celeryでタスク実行されるまでの流れ #422

CeleryはPythonの分散タスクキュー(もしくは非同期タスクキュー)として使用される技術です。分散タスクキューでは、メッセージブローカーを介してタスク(作業の単位)を送受信し、複数のワーカープロセスがこれらのタスクを非同期に実行します。

Celeryの主な用途:
●非同期処理
●分散・並列処理
●スケジューリング
●耐障害性と拡張性

Djangoとも簡単に連携でき、バックグラウンドで計算処理などを実行する時に便利です。

私が所属するチームではアプリケーション本体が起動するAPI用コンテナと、バッチ処理専用のCelery用コンテナが別々で走っていて、これがどのように連携しているのかきちんと理解できていなかったので整理しました。

アプリケーション本体からタスクを送る

ざっくり以下の流れです。

  1. タスク化したい処理を関数にして「@app.task」デコレーターをつける

  2. 関数を「.delay()」や「.apply_async()」で実行する

  3. タスクをシリアライゼーションしてメッセージブローカーに送る

  4. Celeryワーカーがメッセージブローカーをポーリングし、タスクがあれば処理する

ChatGPT先生によるサンプルコードです。
タスク化したい処理に「@app.task」をつけて、

# tasks.py

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest:guest@localhost//')

@app.task
def add(x, y):
    return x + y

delayやapply_asyncで実行します。

# another_module.py

from tasks import add

result = add.delay(4, 4)

メッセージブローカーはcelery.pyやDjangoのsettings.pyで設定できます。

# celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

_redis = "redis://{host}:{port}/{db}".format(**REDIS_CONFIGS['your_config'])

app = Celery(
    'myproject',
     broker=_redis, 
     backend='rpc://', include=['myproject.tasks']
)

タスクが送信されるのは理解できましたが、Celeryがどうやって関数の内容(タスクの処理内容)を把握しているのかが分かりませんでした。

これもChatGPT先生に聞いたところ、以下のように教えてくれました。

タスクのキューへの送信
タスクのシリアライゼーション:
delayやapply_asyncが呼び出されたとき、タスク関数とその引数はシリアライゼーション(通常はJSONやPickleを使って)され、メッセージとしてブローカーに送信されます。
メッセージの構造:
送信されるメッセージには、タスク関数の名前やモジュールパス、引数、実行オプションなど、タスクの実行に必要な情報が含まれます。

タスクの実行
タスクのデシリアライゼーション:

ワーカーはブローカーからメッセージを受け取り、シリアライズされたタスクをデシリアライゼーションして元の関数呼び出しに戻します。
タスク関数の実行:
デシリアライズされたタスク関数がワーカープロセス上で実行されます。ワーカーはタスク関数の位置情報(モジュールパス)を元に、対応する関数をインポートし実行します。

まぁつまり必要な情報は良い感じにして渡してくれているということですね。


タスクをスケジューリングする

Celeryはタスクのスケジューリングが可能です。私のチームでは日次バッチをスケジューリングして実行しています。

その場合、ざっくり以下の流れになります。

  1. 「celery beat」が設定に基づいてタスクをスケジュールする

  2. 時間になるとタスクをメッセージブローカーに送る(キューイング)

  3. Celeryワーカーがメッセージブローカーをポーリングしてタスクを処理する

以下のように定義できます。
日次バッチの設定例です。

# celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

_redis = "redis://{host}:{port}/{db}".format(**REDIS_CONFIGS['your_config'])

app = Celery(
    'myproject',
     broker=_redis, 
     backend='rpc://', include=['myproject.tasks']
)

app.conf.beat_schedule = {
    'daily_batch': {
        'task': 'tasks.exec_daily_batch',
        'schedule': crontab(minute="30", hour='0', day_of_week='mon-fri'),
    },
}


Celeryは別コンテナに切り出せる

ここまで見ていただければ分かるように、Celeryとアプリケーション本体の間に中継時点(メッセージブローカー)があることで、それぞれ別々の環境(コンテナ)で動いていても、システム全体としてスムーズに動作します。


ここまでお読みいただきありがとうございました!

この記事が気に入ったらサポートをしてみませんか?