概要
キューのルーティング機能とは簡単に言えばキューとワーカープロセスの紐付けです
「このキューに入った場合はこのワーカーに処理させる」ということができるので優先度付けなどができるようになります
今回は簡単なデモアプリを作成して試してみました
環境
- macOS 10.15.5
- Python 3.7.3
- celery 4.4.3
2 つのタスクを作成する
キューがごとに処理させるタスクを分けるのでファイルも複数に分けます
タスクを作成するときにはキューは指定しません
vim tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost')
@app.task
def add(x, y):
print(x + y)
vim tasks2.py
from celery import Celery
app = Celery('tasks2', broker='redis://localhost')
@app.task
def multi(x, y):
print(x * y)
2 つのタスクを起動する
pipenv run celery -A tasks worker
tasks2 は -Q
オプションでキューを指定します
pipenv run celery -A tasks2 worker -Q tasks
エンキューして動作確認する
エンキューする際に delay
ではなく apply_async
を使います
そして引数に queue=
を指定することで特定のキューに対してメッセージをエンキューできます
vim test.py
from tasks import add
from tasks2 import multi
add.delay(100, 1)
multi.apply_async((100, 1), queue="tasks2")
これで動作確認するとそれぞれのワーカープロセスでログが出力されているのが確認できると思います
最後に
celery でキューのルーティングを試してみました
一点ポイントなのは同じキューにエンキューさた際にタスクに定義されていないメソッドがあると KeyError
になるので注意が必要です
今回の場合ですと tasks
は multi メソッドがないのでもし multi.delay
でデフォルトのキューにエンキューしてしまった場合 pipenv run celery -A tasks worker
で起動したワーカー側に KeyError
が表示されてしまいます
0 件のコメント:
コメントを投稿