2023年1月23日月曜日

Celery カスタムクラスとして作成したタスクでハンドラの挙動を確認した

Celery カスタムクラスとして作成したタスクでハンドラの挙動を確認した

概要

前回 Celery のカスタムタスクを作成する方法を紹介しました
今回はカスタムタスクを使って各種イベントハンドラを使用する方法を紹介します
エラーやリトライ時の挙動をハンドリングできます

環境

  • macOS 11.7.2
  • Python 3.10.2
  • celery 5.2.7

サンプルコード

  • vim tasks.py
from celery import Celery

app = Celery("tasks")
app.config_from_object("celeryconfig")


class AddTask(app.Task):

    def __init__(self):
        self.name = "AddTask"

    def run(self, x, y, *args, **kwargs):
        # raise RuntimeError  # call to on_failure
        # self.retry()  # call to on_retry
        return x + y

    def before_start(self, task_id, *args, **kwargs):
        print("before_start")
        print(task_id)

    def after_return(self, status, retval, task_id, einfo = None, *args, **kwargs):
        print("after_return")
        print(status)

    def on_failure(self, exc, task_id, einfo = None, *args, **kwargs):
        print("on_failure")
        print(exc)

    def on_retry(self, exc, task_id, einfo = None, *args, **kwargs):
        print("on_retry")
        print(exc)

    def on_success(self, retval, task_id, *args, **kwargs):
        print("on_success")
        print(retval)


class MultiTask(app.Task):

    def __init__(self):
        self.name = "MultiTask"

    def run(self, x, y, *args, **kwargs):
        return x * y


app.register_task(AddTask())
app.register_task(MultiTask())
  • vim celeryconfig.py
result_backend = "redis://localhost"
broker_url = "redis://localhost"
worker_prefetch_multiplier = 1
task_acks_late = True

実行コード

  • vim app.py
from celery import chain
from tasks import AddTask, MultiTask


class Base():

    def __init__(self):
        first = AddTask().s(1, 2)
        second = MultiTask().s(10)
        third = MultiTask().s(2)
        self.tasks = chain(first,
                           second,
                           third)

    def run(self):
        return self.tasks.apply_async()

    def append_task(self, task):
        self.tasks.tasks.append(task)


class Workflow1(Base):

    def __init__(self):
        forth = MultiTask().s(3)
        super().__init__()
        self.append_task(forth)


if __name__ == '__main__':
    base = Base()
    print(base.run().get())
    wf1 = Workflow1()
    print(wf1.run().get())

ポイント

before_start と after_return はタスクが成功しようが失敗しようが必ずコールされます
他の on_success, on_failure, on_retry はその間で呼ばれるハンドラになります

リトライはデフォルトだと3回行います
失敗したあとに180秒待ったあとリトライを行います
on_retry が 3 回呼び出されて MaxRetriesExceededError になると最後に on_failure が呼び出されます

参考サイト

0 件のコメント:

コメントを投稿