Flask + Celeryの非同期アプリをHerokuでデプロイ

個人ブログの記事を移行しました。
前記事:https://hajimeteblog.com/flask-heroku/
2021.03.30-> ブログ閉鎖しました

はじめに

Flaskを使ったデータ分析アプリを作成して、それをデプロイするのにかなり苦戦したので、備忘録を残します。
FlaskをHerokuでデプロイする記事は結構ありましたが、Celery、Redisを使ったデプロイ方法があまりなく大変でした。
いろんな点でつまづいたので、備忘録として残しておきます。

全体のファイル

画像1

Celeryの設定

非同期的に処理を行いたかったので、Celeryというpython job queueの処理フレームワークを利用しました。workerに非同期で行いたい処理を書きました。
celeryはtaskとファイルを分けて書いている記事が多いですが、どちらもapp.pyに記述しました。
celeryは次のように定義しました。

from flask import *
import os
from os.path import join, dirname
from celery import Celery
import settings

def make_celery(app):
 celery = Celery(
   app.name,
   backend=app.config['CELERY_RESULT_BACKEND'],
   broker=app.config['CELERY_BROKER_URL']
 )
 celery.conf.update(app.config)

 class ContextTask(celery.Task):
   def __call__(self, *args, **kwargs):
     with app.app_context():
       return self.run(*args, **kwargs)

 celery.Task = ContextTask
 return celery

app = Flask(__name__)

app.config.update(
 CELERY_BROKER_URL=settings.REDIS_URL,
 CELERY_RESULT_BACKEND=settings.REDIS_URL
)
celery = make_celery(app)

@celery.task
def やりたい処理を書く

Procfileの設定

次のように記述しました。gunicornはpip install gunicornして利用します。

pip install pipreqs
$ pipreqs ./

これで、requirements.txtが作成されました。
ただ、pythonファイルで直接importしてないredisとgunicornがインポートされていなかったので、
redis==3.5.0
gunicorn==20.0.4

を追記しました。

Redisを入れる

herokuを初めて使う方は登録から行ってください。

heroku login

loginすると、ブラウザが開きます。
dashboardから「Create new app」を選択してプロジェクトを作成します。
作成したプロジェクトを開き「Resorces」に移動して「Find more addons」を選択

画像2

検索窓に「redis」を入力して検索します。

画像3

画像のHeroku Redisを今回は使用しました。
「Install Heroku Redis」を押して、「Add to provision to」に作成したプロジェクトを入力・選択します。

環境変数について

ダッシュボードからプロジェクトを選択して、Settingタブを選択します。
画像のようにURLというところからRedisのURLを確認します。

画像4

これは公開してはいけない情報なので、.envファイルを作成し、次のように書きます。
ダブルクォートなどは不要です。

REDIS_URL=上の画像に表示されたURL

これを呼び出すのに、python-dotenvモジュールをインポートして利用します。
settings.pyというファイルを用意して.envから変数をとります。
settings.pyは次のように書きました。

import os
from os.path import join, dirname
from dotenv import load_dotenv

dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)

REDIS_URL = os.environ.get("REDIS_URL")

このsettings.pyをimportして使います。
上の「Celeryの設定」でも書いていますが、抜き出すと

import settings
settings.REDIS_URL

で環境変数を取得できます。

Herokuへのデプロイ

git add .
git commit -m "Release to deploy"
git push heroku master

HerokuのURLにアクセスしてみましょう。
正しく動いていれば成功です。

ログを確認する

もし、エラーが発生している場合はログを確認します。

heroku logs --tail

でエラーを確認して、対処してください。

ファイルアップロードができない

エラーもなくなり、アプリケーションを起動できるようになりました。
今回、ユーザーがファイルをアップロードして処理をするアプリケーションを作成していました。
しかし、ファイルをアップロードしても読み込まれません。

ログを確認してみると、

画像5

なんかWorkerがTIMEOUTになってる!!!

これ調べてみると、Herokuの公式ページに説明が書かれてました。
どうやら大きなファイルをアップロードすると、30秒以上かかりタイムアウトになってしまうようだ。。。

心がだいぶ折れました。

ファイルを圧縮してみる

公式ページではAWS S3などに直接あげてくださいと書いてありましたが、私のアプリケーションでアップロードするファイルは20Mバイト弱程度なので、そこまで大きなファイルでもない気がしていました。

もう少し調べると、Celeryの公式ページでJSONデータを圧縮して送信する手法が挙げられていました。
他にも調べると、圧縮している記事を見かけたので、こちらの記事を参考にさせていただきました。app.pyを以下のように変更

# -------追加----------------------
import zlib, json, base64
ZIPJSON_KEY='base64(zip(o))'

def json_zip(j):
 j = {
   ZIPJSON_KEY: base64.b64encode(
     zlib.compress(json.dumps(j).encode('utf-8'))
   ).decode('ascii')
 }
 return j

def json_unzip(j, insist=True):
 try:
   assert (j[ZIPJSON_KEY])
   assert (set(j.keys()) == {ZIPJSON_KEY})
 except:
   if insist:
     raise RuntimeError("JSON not in the expected format {" + str(ZIPJSON_KEY) + ":zipstring}")
   else:
     return j
 try:
   j = zlib.decompress(base64.b64decode(j[ZIPJSON_KEY]))
 except:
   raise RuntimeError("Could not decode/unzip th3e contents")
 try:
   j = json.loads(j)
 except:
   raise RuntimeError("Could interpret the unzipped contents")
 return j
# ----------------------------


# 変更点
# predict.delay(file_hash, df)
predict.delay(file_hash, json_zip(df.to_json()))

これで、uploadからのレスポンスは返ってきました。
ただ、predictの中で処理を書くと、それだけで30sのタイムアウト処理が発生してしまい。何も処理することができません。

タイムアウト時間を延長する

タイムアウト時間はデフォルトで30秒と決められていたので、それを変更すれば処理ができると考えましたが、どのように変更したら良いかわからず大苦戦。
(解決するのに2日くらいかかってしまった)

結果的にProcfileに書き込めることがわかりました。
次のように変更します。

worker: celery -A app.celery worker --loglevel=info
web: gunicorn app:app --timeout 120 --log-file

これでタイムアウト時間を120秒に変更できました。
実行すると、処理を行うことに成功しました!!!

あらかじめモデルを作成して保存しておく

また、処理高速化のためにpickleを使ってモデルを保存しておき、タスクの中でモデルを作成を行わないようにしました。
この処理はpythonの公式ページに載っていました。

モデルを別のファイルで作ってしまって、以下の記述をするとmodel.pickleというファイルに保存することができます。

with open('model.pickle', mode='wb') as fp:
    pickle.dump(model, fp)

あとは、呼び出す側で次のように記述します。

 import lightgbm as lgb
 with open(join(dirname(__file__), 'model.pickle'), mode='rb') as fp:
   model = pickle.load(fp)

あとはいつもと同じように、modelを使って予測するだけです!便利!!

Dynosの問題点

これでデプロイ完了かと思いきや、動きません。
ただ、エラーも出ておらず、workerの処理はsucceeded inと出力されていました。

以下のコマンドを使うとheroku上のディレクトリを確認できます。

$ heroku run bash

これでディレクトリを確認してみると、ファイルが保存されていません。
今回、タスク終了時に結果としてCSVファイルを保存するように
submit_df.to_csv(join(dirname(file), "{}.csv".format(filename)))
という記述をしていました。

どうやらこれはDynosの関係で作成できていないようです。
WebのDynosとWorkerのDynosは相互に影響を与えないようです。
つまり、Workerでファイル保存を行っても、Webからは確認できないということ?
公式ページにも説明が書いてありました。。

Each dyno gets its own ephemeral filesystem, with a fresh copy of the most recently deployed code. During the dyno’s lifetime its running processes can use the filesystem as a temporary scratchpad, but no files that are written are visible to processes in any other dyno and any files written will be discarded the moment the dyno is stopped or restarted. For example, this occurs any time a dyno is replaced due to application deployment and approximately once a day as part of normal dyno management.

Heroku Dev Center https://devcenter.heroku.com/articles/dynos

そのため、WorkerとWeb間のデータのやりとりはDBやクラウドストレージを利用して行う必要がありそうです。

MongoDBの利用

そこで今回はHerokuのアドオンとして無料で利用できるMongoDBを使いました。
Redisと同様にHerokuのアドオンを追加します。

Herokuの「Resources」から「Find more add-ons」を選択し、「mLab MongoDB」を追加します。

画像6

Heroku同様にURIを取得して、.envに追加しましょう。

# .envファイル
REDIS_URL=自身のREDISのURL
MONGODB_URL=自身のMONGODBのURL  # 追加

これを読み込んで使います。まずは、DBへの接続

def connect_mdb():
 client = MongoClient(settings.MONGODB_URI, retryWrites=False)
 db = client.get_default_database()
 return {"client": client, "db": db }

DBに再書き込みを行う場合は、retryWrites=Falseを設定する必要がありました。
(デフォルトはTrueで設定されていた)

MongoDBの理解は完全にはできていませんが、構造として
Client(MONGODBの接続先) → DB → Collection(テーブルのイメージ) → Documents(JSON形式でデータを格納)
という感じでしょうか。

DBはget_default_database()で取ってくることができました。
テーブルのデータを取得したい場合は
db.collenction名.find({ 条件 })
のように書いたりできます。

DBへの書き込み、読み込み

データ分析後はだいたいDataFrameとしてデータを持っていますが、DBにはJSON形式で保存する必要があります。
ここにやや苦戦しました。

結果的にstackoverflowに上がっていたコメントを参考にして読み書きができました。

import json
dfData = json.dumps(df.to_dict('records'))
savaData = {'_id': file_hash, 'df': dfData}
res = client.insert_one(savaData)
##### load dfData
data = client.find_one({'_id': file_hash}).get('df')
dfData = json.loads(data)
df = pd.DataFrame.from_dict(dfData)

ということでWorker → MongoDB → Webという形でデータを渡すことができました。

おまけ① Insomnia

他にも、つまづいたところがたくさんあったのですが、挙げだしたらキリがないのでこのくらいにしておきます。

おまけですが、APIを叩く処理を行うのにInsomniaというアプリケーションが大変役立ちました。
今後アプリ制作の機会があれば活用していきます。

おまけ② 完成物

今回作った機能を載せておきます。
あらかじめモデルを登録しておいて、テストデータをファイル読み込みすると結果をファイルダウンロード形式で出力してくれる機能です。

データ予測アプリ