概要
過去 に紹介した方法と少し違った方法を紹介します
環境
- macOS 11.7.2
- Python 3.10.2
- celery 5.2.7
サンプルコード
-
vim tasks.py
from celery import Celery
app = Celery("tasks")
app.config_from_object("celeryconfig")
class AddTask(app.Task):
def __init__(self):
self.name = "AddTask"
def run(self, x, y, *args, **kwargs):
return x + y
class MultiTask(app.Task):
def __init__(self):
self.name = "MultiTask"
def run(self, x, y, *args, **kwargs):
return x * y
app.register_task(AddTask())
app.register_task(MultiTask())
-
vim celeryconfig.py
result_backend = "redis://localhost"
broker_url = "redis://localhost"
worker_prefetch_multiplier = 1
task_acks_late = True
実行コード
-
vim app.py
from celery import chain
from tasks import AddTask, MultiTask
class Base():
def __init__(self):
first = AddTask().s(1, 2)
second = MultiTask().s(10)
third = MultiTask().s(2)
self.tasks = chain(first,
second,
third)
def run(self):
return self.tasks.apply_async()
def append_task(self, task):
self.tasks.tasks.append(task)
class Workflow1(Base):
def __init__(self):
forth = MultiTask().s(3)
super().__init__()
self.append_task(forth)
if __name__ == '__main__':
base = Base()
print(base.run().get())
wf1 = Workflow1()
print(wf1.run().get())
少し解説
Celery から作成した app オブジェクトの Task クラスを継承して作成します
name フィールドは必須です
run メソッドも必須です
定義したタスククラスは
実行する場合は app.register_task
を使って登録します
この登録方法が前回少し異なっていました
app.tasks.register
を使うとなぜか AttributeError: 'NoneType' object has no attribute 'push'
というエラーが発生して request_stack.push
ができなくタスクが登録できませんでした
タスクを呼び出す場合は普通にクラスを生成する感じで呼び出します
あとはこれまで通りと同じように s() や chain を使ってタスクを扱います
動作確認
-
pipenv run celery -A tasks worker --loglevel=info
-
pipenv run python app.py
最後に
Celery のタスクのクラス化はドキュメントが少ない印象があるのでちょくちょく紹介していこうかなと思っています
0 件のコメント:
コメントを投稿