2020年6月4日木曜日

Celery + flask 超入門

概要

flask で celery を使って簡単な非同期処理を試してみました

環境

  • macOS 10.15.5
  • Python 3.7.3
    • celery 4.4.3
    • flask 1.1.2

インストール

  • pipenv install celery redis flask

準備: まずは簡単な flask アプリの作成

まずは何も考えず flask アプリを作成します
その後で celery と連携することで具体的な連携方法を理解します
なお flask アプリはちゃんとモジュールなども考慮して作成しています

完成後のファイル構造は以下の通りです

. ├── Pipfile ├── Pipfile.lock ├── runserver.py └── testapp ├── __init__.py ├── celery_maker.py ├── route.py └── tasks.py   1 directory, 7 files

flask アプリの作成

flask アプリを作成します

  • vim testapp/__init__.py
from flask import Flask

app = Flask(__name__)

import testapp.route

testapp.route はあとで後述しています

アプリケーションを起動するスクリプトの作成

__init__.py で作成したアプリを起動するだけのスクリプトです

  • vim runserver.py
from testapp import app

if __name__ == "__main__":
    app.run(debug=True)

ルーティングを管理するファイルの作成

ここにルーティング情報を記載していきます
celery のタスクもここで実行することになります

  • vim testapp/route.py
from testapp import app

@app.route('/')
def index():
    return "ok"

ここから本番: celery と連携する

celery オブジェクトを作成するメソッドを定義

flask アプリを元に celery オブジェクトを作成します

  • vim testapp/celery_maker.py
from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

__init__.py に celery の設定を記載する

celery が必要とするブローカの情報を記載します
この app.conf の情報から celery オブジェクトを作成し管理します

  • vim testapp/__init__.py
from flask import Flask
from testapp.celery_maker import make_celery

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)

import testapp.route

タスクの作成

非同期で行う celery タスクを作成します
わかりやすいようにタスクは個別のファイルで管理できるようにしています
__init__.py で管理している celery オブジェクトを元にタスクを作成します
非同期タスクはアノテーションを付与することで生成できます

  • vim testapp/tasks.py
from testapp import celery

@celery.task
def delay_print(msg):
    print(msg)

route.py でタスクをコールする処理を追加する

あとは実際にタスクをコールする処理を記載すれば OK です

  • vim testapp/route.py
from testapp import app
from testapp.tasks import delay_print

@app.route('/')
def index():
    delay_print.delay("hello")
    return "ok"

動作確認

アプリを起動します

  • pipenv run python runserver.py

次にタスクを実行するワーカープロセスを起動します
ポイントはアプリ名に「testapp.celery」を指定することです
testapp だけだと flask オブジェクトも読み込んでしまいエラーになります
'flask' object has no attribute 'user_options'

  • pipenv run celery -A testapp.celery worker

あとは curl localhsot:5000 をしてみるとレスポンスが返ってくるのと同時にワーカープロセス側のログに「hello」も表示されるのが確認できると思います

最後に

flask + celery を試してみました
試すこと自体は非常に簡単でしたがちゃんとモジュールや管理するファイルの構造を考えないとソースがぐちゃぐちゃになっちゃいそうです

今回の構造であればタスクを増やしたい場合は tasks.py だけを修正すればいいのでキレイに管理できていると思います

参考サイト

2 件のコメント:

  1. """""""""""""""""
    from testapp import celery

    @celery.tasks
    def delay_print(msg):
    print(msg)

    """""""""""""""""
    の@celery.tasksの部分がtasksではなく@celery.taskじゃないと動かなかったです

    返信削除
    返信
    1. ありがとうございます、修正しました

      削除