概要
flask で celery を使って簡単な非同期処理を試してみました
環境
- macOS 10.15.5
- Python 3.7.3
- celery 4.4.3
- flask 1.1.2
インストール
pipenv install celery redis flask
準備: まずは簡単な flask アプリの作成
まずは何も考えず flask アプリを作成します
その後で celery と連携することで具体的な連携方法を理解します
なお flask アプリはちゃんとモジュールなども考慮して作成しています
完成後のファイル構造は以下の通りです
flask アプリの作成
flask アプリを作成します
vim testapp/__init__.py
from flask import Flask
app = Flask(__name__)
import testapp.route
testapp.route
はあとで後述しています
アプリケーションを起動するスクリプトの作成
__init__.py
で作成したアプリを起動するだけのスクリプトです
vim runserver.py
from testapp import app
if __name__ == "__main__":
app.run(debug=True)
ルーティングを管理するファイルの作成
ここにルーティング情報を記載していきます
celery のタスクもここで実行することになります
vim testapp/route.py
from testapp import app
@app.route('/')
def index():
return "ok"
ここから本番: celery と連携する
celery オブジェクトを作成するメソッドを定義
flask アプリを元に celery オブジェクトを作成します
vim testapp/celery_maker.py
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
__init__.py
に celery の設定を記載する
celery が必要とするブローカの情報を記載します
この app.conf
の情報から celery オブジェクトを作成し管理します
vim testapp/__init__.py
from flask import Flask
from testapp.celery_maker import make_celery
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)
import testapp.route
タスクの作成
非同期で行う celery タスクを作成します
わかりやすいようにタスクは個別のファイルで管理できるようにしています
__init__.py
で管理している celery オブジェクトを元にタスクを作成します
非同期タスクはアノテーションを付与することで生成できます
vim testapp/tasks.py
from testapp import celery
@celery.task
def delay_print(msg):
print(msg)
route.py でタスクをコールする処理を追加する
あとは実際にタスクをコールする処理を記載すれば OK です
vim testapp/route.py
from testapp import app
from testapp.tasks import delay_print
@app.route('/')
def index():
delay_print.delay("hello")
return "ok"
動作確認
アプリを起動します
pipenv run python runserver.py
次にタスクを実行するワーカープロセスを起動します
ポイントはアプリ名に「testapp.celery
」を指定することです
testapp だけだと flask オブジェクトも読み込んでしまいエラーになります
「'flask' object has no attribute 'user_options'
」
pipenv run celery -A testapp.celery worker
あとは curl localhsot:5000
をしてみるとレスポンスが返ってくるのと同時にワーカープロセス側のログに「hello」も表示されるのが確認できると思います
最後に
flask + celery を試してみました
試すこと自体は非常に簡単でしたがちゃんとモジュールや管理するファイルの構造を考えないとソースがぐちゃぐちゃになっちゃいそうです
今回の構造であればタスクを増やしたい場合は tasks.py だけを修正すればいいのでキレイに管理できていると思います
"""""""""""""""""
返信削除from testapp import celery
@celery.tasks
def delay_print(msg):
print(msg)
"""""""""""""""""
の@celery.tasksの部分がtasksではなく@celery.taskじゃないと動かなかったです
ありがとうございます、修正しました
削除