2022年1月24日月曜日

Celery でタスクが登録されないときに確認すること (コードで説明編)

Celery でタスクが登録されないときに確認すること (コードで説明編)

概要

前回文章で説明しました

実際にサンプルのコードが合ったほうがイメージしやすいのでコードでも紹介します

環境

  • macOS 11.6.2
  • Python 3.10.1
  • celery 5.2.3

登録されないパターン

まずはタスクが登録されないパターンです
これで celery を起動しても作成した add, multi はタスクとして登録されません

もしこの状態でエンキューされると処理するタスクがありませんというエラーになります

  • vim tasks/celery.py
from celery import Celery

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')
  • vim tasks/calc.py
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def multi(x, y):
    return x * y
  • pipenv run celery -A tasks.celery.app worker --loglevel=INFO

登録されるように import する

ではどうすれば別ディレクトリに定義したタスクが登録されるかというと celery 実行時にタスクを import してあげます

またこの import の際には循環参照に気をつけなければいけません

一応これでも動きますがすぐに循環参照しそうなコードになっています

  • vim tasks/celery.py
from celery import Celery
# from tasks.calc import add, multi <- ここだと循環参照になる

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')

from tasks.calc import add, multi
  • vim tasks/calc.py
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def multi(x, y):
    return x * y
  • pipenv run celery -A tasks.celery.app worker --loglevel=INFO

タスクをロードするモジュールを別で作成するパターン

celery オブジェクト管理するモジュールとタスクをロードするモジュールを分けます
celery オブジェクトを複数のモジュールで持つことになりますがこれでも可能です

celery 実行時に loader にある celery オブジェクト (tasks.loader.app ) を指定します

こちらのパターンのほうが循環参照を避けられるかなと思います

  • vim tasks/celery.py
from celery import Celery

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')
  • vim tasks/loader.py
from tasks.calc import add, multi
from celery import Celery

app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')
  • vim tasks/calc.py
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def multi(x, y):
    return x * y
  • pipenv run celery -A tasks.loader.app worker --loglevel=INFO

動作確認

上記すべて以下のスクリプトで動作確認できます
一番上のパターンはタスクが登録されていないのでエラーになります

  • vim main.py
from celery import chain
from tasks.calc import add, multi

tasks = chain(
    add.s(1, 2),
    multi.s(10)).apply_async()

print(tasks.get())

最後に

更にこれに flask などの Webフレームワークや SQLAlchemy などの ORM が加わってくると app コンテキストが必要になってきます

モジュールを分ける場合にはそれらが循環参照しないように気をつける必要があるので大変です

0 件のコメント:

コメントを投稿