2022年6月9日木曜日

celery の chain でタスクごとにキューをルーティングする方法

celery の chain でタスクごとにキューをルーティングする方法

概要

過去にキューをルーティングする方法を紹介しました
今回は chain と組み合わせてルーティングする方法を紹介します

環境

  • macOS 11.6.6
  • Python 3.10.2
  • celery 5.2.7

タスク

まずは普通にタスクを定義します
ここではキューは指定しません

  • vim tasks.py
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task
def add(x, y):
    return x + y

@app.task
def multi(x, y):
    return x * y

chain

次に chain を使ってタスク同士をつなげます
このときにタスクがエンキューされるキューを指定します
apply_async では指定せずにタスクに対して set をコールすることでタスクに紐付けるキューを指定できます

  • vim app.py
from celery import chain
from tasks import add, multi

tasks = chain(add.s(1, 2),
              multi.s(10).set(queue='multi'),
              multi.s(2)).apply_async()

print(tasks.get())

上記の場合は 1 つ目と 3 つ目のタスクはデフォルトのキューで 2 つ目のタスクは「multi」というキューを使います

ワーカーをそれぞれ起動

デフォルトのキューに入ったタスクを処理するワーカーと multi キューに入ったタスクを処理するワーカーを起動します

  • pipenv run celery -A tasks worker -l info
  • pipenv run celery -A tasks worker -Q multi -l info

動作確認

エンキューして動作確認します

  • pipenv run python app.py

これでキューを指定したそれぞれのワーカーでタスクが処理されていることが確認できると思います

apply_async 時に指定するキューの意味

デフォルトのキューになります
タスクそれぞれでキューを set していない場合に使用されるキューになります

0 件のコメント:

コメントを投稿