DockerでCerelyを動かしてみる #187日目
非同期処理や定期実行処理に便利なライブラリである「Cerely」を動かしてみました。実際に触ってみると、非同期処理の便利さが分かって面白かったです。
こちらのチュートリアルをベースに、非同期処理を体験するために少し工夫を加えました。
ディレクトリ構成は以下です。
celery_test--------python_app
| |-----src
| | `---tasks.py
| `----Dockerfile
|
`-----docker-compose.yml
まず、Docker関係のファイルは以下のように記述しました。
[docker-compose.yml]
version: "3"
services:
python_app:
build: ./python_app
volumes:
- ./python_app/src:/src
tty: true
networks:
- redis_network
redis_app:
image: redis:6.0
networks:
- redis_network
networks:
redis_network:
[Dockerfile]
FROM python:3.7
RUN pip install -U celery[redis]
COPY ./src /src
WORKDIR /src
ENTRYPOINT [ "celery", "-A", "tasks", "worker", "--loglevel=INFO" ]
次に実行ファイルとしてtasks.pyを作成します。「@app.task」がポイントで、このようにして関数にデコレータをつけると、その関数が非同期タスクとして登録され、処理できるようになります。
[tasks.py]
from celery import Celery
from time import sleep
app = Celery('tasks', broker='redis://redis_app:6379/', backend='redis://redis_app:6379/')
@app.task
def add(x, y):
return x + y
@app.task
def f():
print('f関数開始')
sleep(5)
print('f関数終了')
def test():
print('テスト開始')
f() # f関数をdelayを使わず普通に呼び出す
print('テスト終了')
def test2():
print('テスト開始')
f.delay() # f関数を非同期タスクとして登録する
print('テスト終了')
これで準備が整ったので、Dockerコンテナを立ち上げてみます。
[ターミナル]
docker-compose build
docker-compose up
これでコンテナが立ち上がると同時にcerelyが起動します。
ちなみに一度cerelyを止めてまた起動させたい場合、以下のコマンドで可能です(Dockerコンテナに入ってから実行することを忘れずに)
celery -A tasks worker --loglevel=info
以下のように表示されていれば無事に動いています。[tasks]に記載されているのが現在登録されているタスクです。このタスクに関連するジョブがcereryに登録されていきます。
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
-------------- celery@eb852ebada00 v5.2.6 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.10.16.3-microsoft-standard-WSL2-x86_64-with-debian-11.3 2022-05-29 09:06:14
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f86e5957250
- ** ---------- .> transport: redis://redis_app:6379//
- ** ---------- .> results: redis://redis_app:6379/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
. tasks.f
[2022-05-29 09:06:14,514: INFO/MainProcess] Connected to redis://redis_app:6379//
[2022-05-29 09:06:14,518: INFO/MainProcess] mingle: searching for neighbors
[2022-05-29 09:06:15,528: INFO/MainProcess] mingle: all alone
[2022-05-29 09:06:15,537: INFO/MainProcess] celery@eb852ebada00 ready.
cerelyが起動しているのとは別のターミナルを開いて、同じようにコンテナに入ってPythonで以下のようにdelay()を使って実行してみます。するとAsyncResultが返ってきており、実行結果は出力されていません。
[Python]
>>> add.delay(4, 4)
<AsyncResult: abf77b01-0766-4fe0-b942-3c23e7c3f254>
ではどこに出力されているのかというと、celeryが起動しているターミナルです。以下のように実行結果が出力されています。
[2022-05-29 08:58:41,232: INFO/MainProcess] celery@eb852ebada00 ready.
[2022-05-29 08:58:51,995: INFO/MainProcess] Task tasks.add[abf77b01-0766-4fe0-b942-3c23e7c3f254] received
[2022-05-29 08:58:51,999: INFO/ForkPoolWorker-7] Task tasks.add[abf77b01-0766-4fe0-b942-3c23e7c3f254] succeeded in 0.00382779100073094s: 8
ちなみにadd(4, 4)のように普通に関数を呼び出した場合、実行結果は普段通り返ってきますが、cerelyのジョブとしては登録されません。
また、tasks.pyのtest関数とtest2関数を実行してみると、非同期処理を体験できるようになっています。
test()を実行すると、「f関数開始」から「f関数終了」と表示されるまで間に5秒かかり、その後に「テスト終了」と表示されます。
[Python]
>>> from tasks import test, test2
>>> test()
テスト開始
f関数開始 # この開始と終了の間に5秒あり、時間がかかる
f関数終了
テスト終了
>>>
>>> test2()
テスト開始 # f関数がcerelyで非同期処理されているため、5秒経過する前に「テスト終了」が出力される
テスト終了
test2でのf関数部分は、cerelyが起動しているターミナルに処理結果が出力されています。
[2022-05-29 09:06:31,060: INFO/MainProcess] Task tasks.f[1031a800-cfab-4801-b3bb-c9558a17332e] received
[2022-05-29 09:06:31,061: WARNING/ForkPoolWorker-8] f関数開始
[2022-05-29 09:06:36,066: WARNING/ForkPoolWorker-8] f関数終了
[2022-05-29 09:06:36,074: INFO/ForkPoolWorker-8] Task tasks.f[1031a800-cfab-4801-b3bb-c9558a17332e] succeeded in 5.013800524999169s: None
間に重たい処理がある場合、今回のように非同期処理を実装すれば全体の処理スピードを向上することができそうです。
ここまでお読みいただきありがとうございました!!
参考
この記事が気に入ったらサポートをしてみませんか?