2023年1月21日土曜日

Celery5 でタスクをクラスとして定義する方法

Celery5 でタスクをクラスとして定義する方法

概要

過去 に紹介した方法と少し違った方法を紹介します

環境

  • macOS 11.7.2
  • Python 3.10.2
  • celery 5.2.7

サンプルコード

  • vim tasks.py
from celery import Celery

app = Celery("tasks")
app.config_from_object("celeryconfig")


class AddTask(app.Task):

    def __init__(self):
        self.name = "AddTask"

    def run(self, x, y, *args, **kwargs):
        return x + y


class MultiTask(app.Task):

    def __init__(self):
        self.name = "MultiTask"

    def run(self, x, y, *args, **kwargs):
        return x * y


app.register_task(AddTask())
app.register_task(MultiTask())
  • vim celeryconfig.py
result_backend = "redis://localhost"
broker_url = "redis://localhost"
worker_prefetch_multiplier = 1
task_acks_late = True

実行コード

  • vim app.py
from celery import chain
from tasks import AddTask, MultiTask


class Base():

    def __init__(self):
        first = AddTask().s(1, 2)
        second = MultiTask().s(10)
        third = MultiTask().s(2)
        self.tasks = chain(first,
                           second,
                           third)

    def run(self):
        return self.tasks.apply_async()

    def append_task(self, task):
        self.tasks.tasks.append(task)


class Workflow1(Base):

    def __init__(self):
        forth = MultiTask().s(3)
        super().__init__()
        self.append_task(forth)


if __name__ == '__main__':
    base = Base()
    print(base.run().get())
    wf1 = Workflow1()
    print(wf1.run().get())

少し解説

Celery から作成した app オブジェクトの Task クラスを継承して作成します

name フィールドは必須です
run メソッドも必須です

定義したタスククラスは 実行する場合は app.register_task を使って登録します
この登録方法が前回少し異なっていました
app.tasks.register を使うとなぜか AttributeError: 'NoneType' object has no attribute 'push' というエラーが発生して request_stack.push ができなくタスクが登録できませんでした

タスクを呼び出す場合は普通にクラスを生成する感じで呼び出します
あとはこれまで通りと同じように s() や chain を使ってタスクを扱います

動作確認

  • pipenv run celery -A tasks worker --loglevel=info
  • pipenv run python app.py

最後に

Celery のタスクのクラス化はドキュメントが少ない印象があるのでちょくちょく紹介していこうかなと思っています

0 件のコメント:

コメントを投稿