2020年10月29日木曜日

Celery の concurrency について試してみた

概要

Celery のワーカの並列数を指定するにはワーカを起動する際に --concurrency オプションを指定します
今回は --concurrenty の使い方とポイントを紹介します

環境

  • macOS 10.15.7
  • Python 3.8.5
    • celery 4.4.7

とりあえず concurrency を使ってみる

とりあえずワーカ 1 つで concurrency を使ってみます
ワーカは簡単なワーカを使います
効果を確認するために sleep を入れています

  • vim sub_tasks.py
import time
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task
def add(x, y):
    time.sleep(10)
    return x + y

メインは以下の通りです

  • vim main.py
from sub_tasks import add

for i in range(10):
    ret = add.delay(100, i)
    print(ret)

ワーカを起動します
--concurrency=10 で起動してみます

  • pipenv run celery -A sub_tasks worker --loglevel=info --concurrency=10

これでメインを起動してみます

  • pipenv run python main.py

main.py は非同期で実行されたジョブの ID が表示されてすぐに終了します
ワーカ側の処理を見ると 10 個のジョブがすべて同時に実行されているのが確認できると思います

タスクごとに concurrency を変更したい場合には

ワーカごとに concurrency を変更したい場合は単純に起動するワーカごとに concurrency 引数の値を変更すれば OK です
Celery でタスクごとに concurrency を変更できるのかというと結論としてはバージョン 4 系ではできません (参考)
なのでタスクごとに concurrency を変更したい場合はワーカに分割しかつキューも専用のキューを指定する必要があります

sub_tasks2.py を作成して動作確認してみます
task_routes でワーカが処理対象とするキューを指定できます

  • vim sub_tasks2.py
import time
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
app.conf.task_routes = {'sub_tasks2.multi': {'queue': 'multi'}}

@app.task
def multi(x, y):
    time.sleep(10)
    return x * y

そして既存の sub_tasks.py も修正します
こちらも処理対象のキューを指定しましょう

  • vim sub_tasks.py
import time
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
app.conf.task_routes = {'sub_tasks.add': {'queue': 'add'}}

@app.task
def add(x, y):
    time.sleep(10)
    return x + y

メインのスクリプトは上記 2 つのワーカを呼び出すように変更します

  • vim main.py
from sub_tasks import add
from sub_tasks2 import multi

for i in range(10):
    ret = add.delay(100, i)
    print(ret)
    ret = multi.delay(100, i)
    print(ret)

あとはワーカをそれぞれ起動するだけですがこのときも、どのキューに対して処理するのかを指定します
また今回は動作確認として sub_tasks2 側のワーカを --concurrency=1 で起動して並列数が 1 で実行されるか確認します

  • pipenv run celery -A sub_tasks worker --loglevel=info --concurrency=10 -Q add
  • pipenv run celery -A sub_tasks2 worker --loglevel=info --concurrency=1 -Q multi

これで準備 OK です
あとはメインを実行します

  • pipenv run python main.py

メインはこれまで通りすぐに結果が返ってくるのが確認できると思います
そしてワーカは sub_tasks 側は 10 個のジョブがすべて並列で終了するのに対して sub_tasks2 側はジョブは 1 つずつ終了するのが確認できると思います

気になったこと

sub_taksk2 側のワーカがジョブを受け取る際に一気に 5 個のジョブを受け取ってそれぞれ 1 つずつ処理し 1 つ処理が終わるとまた 1 つジョブを受け取る感じでした
sub_tasks 側はジョブの受け取りも実行も同時に 10 個ずつ行っていました

もしかするとワーカのジョブの受け取り方も concurrency が影響しているのかもしれません

最後に

Celery のワーカの並列数について挙動を確認してみました
タスクごとに並列数を変更したい場合はワーカにしてかつキューも指定する必要があることがわかりました

もしかするとバージョンが上がればタスクごとに並列数を指定することができるオプションが追加されるかもしれません

参考サイト

0 件のコメント:

コメントを投稿