2023年1月11日水曜日

celery のタスクで chain 作成後にタスクを挿入する方法

celery のタスクで chain 作成後にタスクを挿入する方法

概要

chain は celery でタスクを順番に実行することができるワークフロー機能です
chain で作成したタスクの一覧に対して実行する前にタスクの順番を変えたい場合があると思います
そんなときのテクニックを紹介します

環境

  • macOS 11.7.2
  • Python 3.10.2
  • celery 5.2.7

タスク

from celery import Celery

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(bind=True)
def add(self, x, y):
    return x + y


@app.task(bind=True)
def multi(self, x, y):
    return x * y

サンプルコード

from celery import chain
from tasks import add, multi

first = add.s(1, 2)
second = multi.s(10)
third = multi.s(2)
forth = multi.s(3)

tasks = chain(first,
              second,
              third)
# tasks.tasks.insert(2, forth)
tasks.tasks.append(forth)

result = tasks.apply_async()
print(result.get())

ちょっと解説

chain で生成されたオブジェクトは _chain クラスのオブジェクトになります
このクラスにある tasks プロパティは配列で管理されておりこの中にタスクの一覧が入っています
あとはこの配列に対してタスクの追加などを行うだけです

もう少し汎用的にしてみる

この仕組みを応用してもう少し汎用的な形にしてみます
基本となる chain のタスク一覧を管理するクラスを作成しそのタスク一覧をもとに別の chain を実行するワークフローを作成します

from celery import chain
from tasks import add, multi


class Base():

    def __init__(self):
        first = add.s(1, 2)
        second = multi.s(10)
        third = multi.s(2)
        self.tasks = chain(first,
                           second,
                           third)

    def run(self):
        return self.tasks.apply_async()

    def append_task(self, task):
        self.tasks.tasks.append(task)


class Workflow1(Base):

    def __init__(self):
        forth = multi.s(3)
        super().__init__()
        self.append_task(forth)


if __name__ == '__main__':
    base = Base()
    print(base.run().get())
    wf1 = Workflow1()
    print(wf1.run().get())

こんな感じに記載することで chain で実行するタスクのリストを重複することなく管理する ことができるようになります

最後に

さらにタスクの一覧と chain オブジェクトを別のクラスとして管理してもいいかなと思います

テストも書きやすくなるかなと思います

0 件のコメント:

コメントを投稿