概要
過去にキューをルーティングする方法を紹介しました
今回は chain と組み合わせてルーティングする方法を紹介します
環境
- macOS 11.6.6
- Python 3.10.2
- celery 5.2.7
タスク
まずは普通にタスクを定義します
ここではキューは指定しません
-
vim tasks.py
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@app.task
def add(x, y):
return x + y
@app.task
def multi(x, y):
return x * y
chain
次に chain を使ってタスク同士をつなげます
このときにタスクがエンキューされるキューを指定します
apply_async では指定せずにタスクに対して set をコールすることでタスクに紐付けるキューを指定できます
-
vim app.py
from celery import chain
from tasks import add, multi
tasks = chain(add.s(1, 2),
multi.s(10).set(queue='multi'),
multi.s(2)).apply_async()
print(tasks.get())
上記の場合は 1 つ目と 3 つ目のタスクはデフォルトのキューで 2 つ目のタスクは「multi」というキューを使います
ワーカーをそれぞれ起動
デフォルトのキューに入ったタスクを処理するワーカーと multi キューに入ったタスクを処理するワーカーを起動します
- pipenv run celery -A tasks worker -l info
- pipenv run celery -A tasks worker -Q multi -l info
動作確認
エンキューして動作確認します
-
pipenv run python app.py
これでキューを指定したそれぞれのワーカーでタスクが処理されていることが確認できると思います
apply_async 時に指定するキューの意味
デフォルトのキューになります
タスクそれぞれでキューを set していない場合に使用されるキューになります
0 件のコメント:
コメントを投稿