2020年6月5日金曜日

celery でキューのルーティング機能を試してみた

概要

キューのルーティング機能とは簡単に言えばキューとワーカープロセスの紐付けです
「このキューに入った場合はこのワーカーに処理させる」ということができるので優先度付けなどができるようになります
今回は簡単なデモアプリを作成して試してみました

環境

  • macOS 10.15.5
  • Python 3.7.3
    • celery 4.4.3

2 つのタスクを作成する

キューがごとに処理させるタスクを分けるのでファイルも複数に分けます
タスクを作成するときにはキューは指定しません

  • vim tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost')

@app.task
def add(x, y):
    print(x + y)
  • vim tasks2.py
from celery import Celery

app = Celery('tasks2', broker='redis://localhost')

@app.task
def multi(x, y):
    print(x * y)

2 つのタスクを起動する

  • pipenv run celery -A tasks worker

tasks2 は -Q オプションでキューを指定します

  • pipenv run celery -A tasks2 worker -Q tasks

エンキューして動作確認する

エンキューする際に delay ではなく apply_async を使います
そして引数に queue= を指定することで特定のキューに対してメッセージをエンキューできます

  • vim test.py
from tasks import add
from tasks2 import multi

add.delay(100, 1)
multi.apply_async((100, 1), queue="tasks2")

これで動作確認するとそれぞれのワーカープロセスでログが出力されているのが確認できると思います

最後に

celery でキューのルーティングを試してみました
一点ポイントなのは同じキューにエンキューさた際にタスクに定義されていないメソッドがあると KeyError になるので注意が必要です
今回の場合ですと tasks は multi メソッドがないのでもし multi.delay でデフォルトのキューにエンキューしてしまった場合 pipenv run celery -A tasks worker で起動したワーカー側に KeyError が表示されてしまいます

参考サイト

0 件のコメント:

コメントを投稿