概要
chain は celery でタスクを順番に実行することができるワークフロー機能です
chain で作成したタスクの一覧に対して実行する前にタスクの順番を変えたい場合があると思います
そんなときのテクニックを紹介します
環境
- macOS 11.7.2
- Python 3.10.2
- celery 5.2.7
タスク
from celery import Celery
app = Celery("tasks")
app.config_from_object("celeryconfig")
@app.task(bind=True)
def add(self, x, y):
return x + y
@app.task(bind=True)
def multi(self, x, y):
return x * y
サンプルコード
from celery import chain
from tasks import add, multi
first = add.s(1, 2)
second = multi.s(10)
third = multi.s(2)
forth = multi.s(3)
tasks = chain(first,
second,
third)
# tasks.tasks.insert(2, forth)
tasks.tasks.append(forth)
result = tasks.apply_async()
print(result.get())
ちょっと解説
chain で生成されたオブジェクトは _chain クラスのオブジェクトになります
このクラスにある tasks プロパティは配列で管理されておりこの中にタスクの一覧が入っています
あとはこの配列に対してタスクの追加などを行うだけです
もう少し汎用的にしてみる
この仕組みを応用してもう少し汎用的な形にしてみます
基本となる chain のタスク一覧を管理するクラスを作成しそのタスク一覧をもとに別の chain を実行するワークフローを作成します
from celery import chain
from tasks import add, multi
class Base():
def __init__(self):
first = add.s(1, 2)
second = multi.s(10)
third = multi.s(2)
self.tasks = chain(first,
second,
third)
def run(self):
return self.tasks.apply_async()
def append_task(self, task):
self.tasks.tasks.append(task)
class Workflow1(Base):
def __init__(self):
forth = multi.s(3)
super().__init__()
self.append_task(forth)
if __name__ == '__main__':
base = Base()
print(base.run().get())
wf1 = Workflow1()
print(wf1.run().get())
こんな感じに記載することで chain で実行するタスクのリストを重複することなく管理する ことができるようになります
最後に
さらにタスクの一覧と chain オブジェクトを別のクラスとして管理してもいいかなと思います
テストも書きやすくなるかなと思います
0 件のコメント:
コメントを投稿