概要
Celery のワーカの並列数を指定するにはワーカを起動する際に --concurrency
オプションを指定します
今回は --concurrenty
の使い方とポイントを紹介します
環境
- macOS 10.15.7
- Python 3.8.5
- celery 4.4.7
とりあえず concurrency を使ってみる
とりあえずワーカ 1 つで concurrency を使ってみます
ワーカは簡単なワーカを使います
効果を確認するために sleep を入れています
vim sub_tasks.py
import time
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@app.task
def add(x, y):
time.sleep(10)
return x + y
メインは以下の通りです
vim main.py
from sub_tasks import add
for i in range(10):
ret = add.delay(100, i)
print(ret)
ワーカを起動します
--concurrency=10
で起動してみます
pipenv run celery -A sub_tasks worker --loglevel=info --concurrency=10
これでメインを起動してみます
pipenv run python main.py
main.py
は非同期で実行されたジョブの ID が表示されてすぐに終了します
ワーカ側の処理を見ると 10 個のジョブがすべて同時に実行されているのが確認できると思います
タスクごとに concurrency を変更したい場合には
ワーカごとに concurrency を変更したい場合は単純に起動するワーカごとに concurrency 引数の値を変更すれば OK です
Celery でタスクごとに concurrency を変更できるのかというと結論としてはバージョン 4 系ではできません (参考)
なのでタスクごとに concurrency を変更したい場合はワーカに分割しかつキューも専用のキューを指定する必要があります
sub_tasks2.py
を作成して動作確認してみます
task_routes
でワーカが処理対象とするキューを指定できます
vim sub_tasks2.py
import time
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
app.conf.task_routes = {'sub_tasks2.multi': {'queue': 'multi'}}
@app.task
def multi(x, y):
time.sleep(10)
return x * y
そして既存の sub_tasks.py
も修正します
こちらも処理対象のキューを指定しましょう
vim sub_tasks.py
import time
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
app.conf.task_routes = {'sub_tasks.add': {'queue': 'add'}}
@app.task
def add(x, y):
time.sleep(10)
return x + y
メインのスクリプトは上記 2 つのワーカを呼び出すように変更します
vim main.py
from sub_tasks import add
from sub_tasks2 import multi
for i in range(10):
ret = add.delay(100, i)
print(ret)
ret = multi.delay(100, i)
print(ret)
あとはワーカをそれぞれ起動するだけですがこのときも、どのキューに対して処理するのかを指定します
また今回は動作確認として sub_tasks2 側のワーカを --concurrency=1
で起動して並列数が 1 で実行されるか確認します
pipenv run celery -A sub_tasks worker --loglevel=info --concurrency=10 -Q add
pipenv run celery -A sub_tasks2 worker --loglevel=info --concurrency=1 -Q multi
これで準備 OK です
あとはメインを実行します
pipenv run python main.py
メインはこれまで通りすぐに結果が返ってくるのが確認できると思います
そしてワーカは sub_tasks
側は 10 個のジョブがすべて並列で終了するのに対して sub_tasks2
側はジョブは 1 つずつ終了するのが確認できると思います
気になったこと
sub_taksk2
側のワーカがジョブを受け取る際に一気に 5 個のジョブを受け取ってそれぞれ 1 つずつ処理し 1 つ処理が終わるとまた 1 つジョブを受け取る感じでした
sub_tasks
側はジョブの受け取りも実行も同時に 10 個ずつ行っていました
もしかするとワーカのジョブの受け取り方も concurrency が影響しているのかもしれません
最後に
Celery のワーカの並列数について挙動を確認してみました
タスクごとに並列数を変更したい場合はワーカにしてかつキューも指定する必要があることがわかりました
もしかするとバージョンが上がればタスクごとに並列数を指定することができるオプションが追加されるかもしれません
0 件のコメント:
コメントを投稿