Pythonで非同期でタスクを実行して、モニタリングする環境をDockerで構築する
Pythonで非同期処理をやる場合はceleryを使うのが定番です。今回はceleryを試しに動作させるための環境をDockerを使って簡単に作る方法をご紹介いたします。
コード自体はQiitaにある参考資料のものをそのまま持ってきています。参考源は一番下に載せておきます。
今回はdocker-composeを使用して、動作に必要なコンテナを作っていきます。
※関係ないですが、画像は野菜のセロリを使用しています。画像はテキトウにに選びました・・・
使用するツール類
今回は以下の3つを使用します。それぞれ別なコンテナで動作させるため、本番の運用などでは、別々なサーバーで動作させることを想定しています。
・redis (キューを動作させるためのKVS)
・celery (Pythonのタスクキューサービス、別なプロセスで動作させる)
・flower (celery内にあるタスクを監視するためのツール、webで動作)
今回作成するもの
今回は至ってシンプルな物を作成します。キューに貯めるタスクは以下のような処理です。
単純に10秒待って標準出力をするための関数と、引数二つの足し算をする関数の二つだけです。これらを非同期で動作させます。
def run():
time.sleep(10)
print('処理 おわた')
return 'おわったよ'
def calc(a, b):
return a+b
簡単な図ではありますが、以下のような感じです。
(Task) => (Queue Redis) => (Worker Celery)
※一応ではありますが、図としては以下の記事にある画像の図が一番わかりやすいです。
非同期で動作させることの利点
上記の様な簡単な処理ではわざわざceleryを導入して、非同期処理をする必要はありませんが、CPUに負荷がかかってしまうような計算処理や、RDBへの永続処理などの一つ一つが重いような処理を行う必要性がある場合は、キューに処理であるタスクを貯めていきます。
後はキューであるRedisにタスクが貯まるのをワーカーであるCeleryが検知し、自動で負荷状況などを調整して、タスクを徐々に実行してくれます。
docker-compose.ymlを作成する
それでは実際にDocker上で動作させるため、docker-compose.ymlに設定を記述していきます。
以下のように4つのコンテナを作成します。docker-compose.ymlのファイル内容は以下になります。
version: '3.7'
services:
python:
build: .
tty: true
image: python
container_name: python
volumes:
- ./:/usr/src/app
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
image: python
tty: true
container_name: celery
volumes:
- ./:/usr/src/app
- ./logs:/usr/src/app/logs
command: celery -A tasks worker --loglevel=info --logfile=logs/celery.log
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- python
- redis
redis:
image: redis:5.0.3-alpine
container_name: redis
tty: true
monitor:
image: python
tty: true
container_name: monitor
ports:
- 5555:5555
command: flower -A tasks --port=5555 --broker=redis://redis:6379/0
depends_on:
- python
- redis
それぞれのコンテナに名称を振ってあり、それぞれのコンテナの役割を以下にまとめました。
・python
キューにタスクを格納する処理を実行するコンテナ
・celery
KVSであるRedisのキューにタスクが入ったのを検知するワーカープロセスを動かすコンテナ。キューに貯まったタスクの実行も行う。
・redis
KVSであるRedisを動作させるコンテナ。Redis以外にもRabbitMQなどを使用してもかまいません。
・monitor
celeryのタスク監視を行うflowerを動作させるコンテナ
Pythonコンテナに必要なファイル類Dockefile
キューであるRedisに対してタスクの発行を行うコンテナのDockefileを記述していきます。他3つのコンテナも「image: python」を指定しているため、他3つのコンテナもDockefileで設定したコンテナが動作します。
# Dockerfile
FROM python:3.6.8-alpine
WORKDIR /usr/src/app
COPY ./requirements.txt /usr/src/app/requirements.txt
RUN pip install -r requirements.txt
COPY . /usr/src/app
Dockerfileの記述内に「RUN pip install -r requirements.txt」とあるように、必要なライブラリを記述するrequirements.txtを作成します。
# requirements.txt
celery==4.2.1
flower==0.9.2
redis==3.2.0
tornado>=4.2.0,<6.0.0
Celeryで動作させるタスク処理を実行するファイル
タスク処理を実行するためのtasks.pyとceleryの設定ファイルであるceleryconfig.pyを作成していきます。
# celeryconfig.py
# celeryを動かすための設定ファイル。
BROKER_URL = 'redis://localhost/0'
# CELERYD_CONCURRENCY=1なので、1こずつキューを捌いていく
# ここはCPU数に合わせていくのがよい
CELERYD_CONCURRENCY = 1
CELERY_RESULT_BACKEND = 'redis'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = "redis"
CELERYD_LOG_FILE = "./celeryd.log"
# CELERYD_LOG_LEVELをINFOにしておくと、
# タスクの標準出力もログ(celeryd.log)に書かれる
CELERYD_LOG_LEVEL = "INFO"
# ワーカーはtasks.pyを読み込み、
# 非同期処理させる関数を
# 含むスクリプト全てを指定
CELERY_IMPORTS = ("tasks", )
以下はcelery上で動作するtasks.pyです。デコレーターである@app.taskを付けることで、関数を読み出すとredisにタスクが格納され、celeryで動作するようになります。os.environ.getの環境変数はdocker-compose.ymlでそれぞれのコンテナで設定しています。
# tasks.py
import os
import time
import celery
import celeryconfig
CELERY_BROKER = os.environ.get('CELERY_BROKER')
CELERY_BACKEND = os.environ.get('CELERY_BACKEND')
app = celery.Celery(
'tasks',
broker=CELERY_BROKER,
backend=CELERY_BACKEND
)
app.config_from_object(celeryconfig)
@app.task
def run():
time.sleep(10)
print('処理 おわた')
return 'おわったよ'
@app.task
def calc(a, b):
return a+b
次にtasks.pyを呼び出すためのmain.pyを作成します。.delay()と入れることで、非同期で処理を実行させます。
処理結果自体は.resultで取り出すことが可能です。
# main.py
import tasks
print('<first task>')
# ここでタスク起動 (runタスク)
worker = tasks.run.delay()
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
pass
# 返り値をだす
print(worker.result)
print('<second task>')
# ここでタスク起動 (calcタスク)
worker = tasks.calc.delay(100, 200)
# 終わらぬなら終わるまで待とうホトトギス
while not worker.ready():
pass
# 返り値をだす
print(worker.result)
ログを格納するためのディレクトリを作成
最後に処理結果のログを格納するためのディレクトリを以下のコマンドを実行して作成します。
$ mkdir logs
docker-compose.yml内のcommandのタグを見ればわかりますが、celeryとmonitorコンテナにはログの格納先のディレクトリを指定しています。
celery -A tasks worker --loglevel=info --logfile=logs/celery.log
flower -A tasks --port=5555 --broker=redis://redis:6379/0
ディレクトリ構成
treeコマンドで以下のように動作に必要なコンポーネント類が揃っていれば問題ありません。
$ tree
.
├── celeryconfig.py
├── docker-compose.yml
├── Dockerfile
├── logs
├── main.py
├── requirements.txt
└── tasks.py
1 directory, 6 files
実際に動作させる
必要なファイル類は作成したので、それでは実際に動作させてみましょう。以下のコマンドを実行してコンテナ類を動作させます。
最初にまずはイメージの作成などが行われるはずです。
$ docker-compose up -d
「docker-compose ps」コマンドを実行することで、現在立ち上がっているコンテナ類がわかります。以下のようになっていれば問題ありません。
$ docker-compose ps
Name Command State Ports
-------------------------------------------------------------------------
celery celery -A tasks worker --l ... Up
monitor flower -A tasks --port=555 ... Up 0.0.0.0:5555->5555/tcp
python python3 Up
redis docker-entrypoint.sh redis ... Up 6379/tcp
redisは6379ポートで動作しており、celeryの動作を監視するためのflowerは、5555ポートで動作していることがわかります。
実際に127.0.0.1:5555にアクセスして見ると、以下のような画面が表示されます。celeryが現在処理しているタスクなどのキューの様子がリアルタイムで見れます。
以下の4つの項目があります。
・Active - 動作中のタスク
・Processed - 処理し終わったタスク
・Failed - 失敗したタスク
・Succeeded - 成功したタスク
上記の画面を見ながら、タスクを実行させてみましょう。まずは以下のコマンドでpythonコンテナの中に入り、main.pyを実行してみます。
$ docker exec -it python /bin/sh
$ python main.py
<first task>
おわったよ
<second task>
300
flowerの画面に戻ってみると、「Active」と「Processed」がそれぞれ1が付いてタスクが動作中なのがわかります。
しばらくすると、以下のように「Succeeded」と「Processed」がそれぞれ2になって、タスクが処理されて成功したことがわかります。
実際にdockerを使用して、非同期処理を試しに実行させるサンプルを作成しました。ご参考になれば幸いです。
参考資料
この記事が気に入ったらサポートをしてみませんか?