見出し画像

DockerでCerelyを動かしてみる #187日目

非同期処理や定期実行処理に便利なライブラリである「Cerely」を動かしてみました。実際に触ってみると、非同期処理の便利さが分かって面白かったです。

こちらのチュートリアルをベースに、非同期処理を体験するために少し工夫を加えました。

【Celeryとは】
Celeryは柔軟にタスクのスケジューリングをすることができる非同期のjob/taskモジュールです。
これを使うことで例えば以下のような実装を簡単に行うことができます。

・毎日0時にアプリケーションの実行結果をメールで送信
・待ち時間の長い処理をバックグラウンドで起動


【Python3】Celeryによるタスクのスケジューリング - Part1


ディレクトリ構成は以下です。

celery_test--------python_app
            |    |-----src
            |    |    `---tasks.py
            |     `----Dockerfile
            |
             `-----docker-compose.yml


まず、Docker関係のファイルは以下のように記述しました。

[docker-compose.yml]
 
version: "3"

services:
    python_app:
        build: ./python_app
        volumes:
            - ./python_app/src:/src
        tty: true
        networks:
            - redis_network
    redis_app:
        image: redis:6.0
        networks:
            - redis_network

networks:
    redis_network:
[Dockerfile]
 
FROM python:3.7

RUN pip install -U celery[redis]

COPY ./src /src

WORKDIR /src

ENTRYPOINT [ "celery", "-A", "tasks", "worker", "--loglevel=INFO" ]


次に実行ファイルとしてtasks.pyを作成します。「@app.task」がポイントで、このようにして関数にデコレータをつけると、その関数が非同期タスクとして登録され、処理できるようになります。

[tasks.py]
 
from celery import Celery
from time import sleep

app = Celery('tasks', broker='redis://redis_app:6379/', backend='redis://redis_app:6379/')

@app.task
def add(x, y):
    return x + y

@app.task
def f():
    print('f関数開始')
    sleep(5)
    print('f関数終了')

def test():
    print('テスト開始')
    f()  # f関数をdelayを使わず普通に呼び出す
    print('テスト終了')

def test2():
    print('テスト開始')
    f.delay()  # f関数を非同期タスクとして登録する
    print('テスト終了')


これで準備が整ったので、Dockerコンテナを立ち上げてみます。

[ターミナル]
docker-compose build
docker-compose up

これでコンテナが立ち上がると同時にcerelyが起動します。
ちなみに一度cerelyを止めてまた起動させたい場合、以下のコマンドで可能です(Dockerコンテナに入ってから実行することを忘れずに)

celery -A tasks worker --loglevel=info


以下のように表示されていれば無事に動いています。[tasks]に記載されているのが現在登録されているタスクです。このタスクに関連するジョブがcereryに登録されていきます。

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@eb852ebada00 v5.2.6 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-5.10.16.3-microsoft-standard-WSL2-x86_64-with-debian-11.3 2022-05-29 09:06:14
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7f86e5957250
- ** ---------- .> transport:   redis://redis_app:6379//
- ** ---------- .> results:     redis://redis_app:6379/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . tasks.add
  . tasks.f

[2022-05-29 09:06:14,514: INFO/MainProcess] Connected to redis://redis_app:6379//
[2022-05-29 09:06:14,518: INFO/MainProcess] mingle: searching for neighbors
[2022-05-29 09:06:15,528: INFO/MainProcess] mingle: all alone
[2022-05-29 09:06:15,537: INFO/MainProcess] celery@eb852ebada00 ready.


cerelyが起動しているのとは別のターミナルを開いて、同じようにコンテナに入ってPythonで以下のようにdelay()を使って実行してみます。するとAsyncResultが返ってきており、実行結果は出力されていません。

[Python]
 
>>> add.delay(4, 4)
<AsyncResult: abf77b01-0766-4fe0-b942-3c23e7c3f254>

ではどこに出力されているのかというと、celeryが起動しているターミナルです。以下のように実行結果が出力されています。

[2022-05-29 08:58:41,232: INFO/MainProcess] celery@eb852ebada00 ready.
[2022-05-29 08:58:51,995: INFO/MainProcess] Task tasks.add[abf77b01-0766-4fe0-b942-3c23e7c3f254] received
[2022-05-29 08:58:51,999: INFO/ForkPoolWorker-7] Task tasks.add[abf77b01-0766-4fe0-b942-3c23e7c3f254] succeeded in 0.00382779100073094s: 8

ちなみにadd(4, 4)のように普通に関数を呼び出した場合、実行結果は普段通り返ってきますが、cerelyのジョブとしては登録されません。


また、tasks.pyのtest関数とtest2関数を実行してみると、非同期処理を体験できるようになっています。

test()を実行すると、「f関数開始」から「f関数終了」と表示されるまで間に5秒かかり、その後に「テスト終了」と表示されます。

[Python]

>>> from tasks import test, test2 
>>> test()
テスト開始
f関数開始     # この開始と終了の間に5秒あり、時間がかかる 
f関数終了
テスト終了
>>>
>>> test2()
テスト開始    # f関数がcerelyで非同期処理されているため、5秒経過する前に「テスト終了」が出力される 
テスト終了

test2でのf関数部分は、cerelyが起動しているターミナルに処理結果が出力されています。

[2022-05-29 09:06:31,060: INFO/MainProcess] Task tasks.f[1031a800-cfab-4801-b3bb-c9558a17332e] received
[2022-05-29 09:06:31,061: WARNING/ForkPoolWorker-8] f関数開始
[2022-05-29 09:06:36,066: WARNING/ForkPoolWorker-8] f関数終了
[2022-05-29 09:06:36,074: INFO/ForkPoolWorker-8] Task tasks.f[1031a800-cfab-4801-b3bb-c9558a17332e] succeeded in 5.013800524999169s: None


間に重たい処理がある場合、今回のように非同期処理を実装すれば全体の処理スピードを向上することができそうです。

ここまでお読みいただきありがとうございました!!


参考


この記事が気に入ったらサポートをしてみませんか?