概要
celery にはプリフェッチという機能がありワーカーが実行中であっても次のジョブを事前に取得して RECEIVED として管理する機能があります
RECEIVED になったジョブはワーカーが保持するすべてのジョブが終了するまで実行されないので長い処理を実行中のワーカーにプリフェッチされると RECEIVED で永遠に実行されないジョブが発生してしまいます
また RECEIVED の状態で1時間経過すると再度ジョブをエンキューするという謎の仕様がありこのせいで同一ジョブが複数回実行されるというバグもあるようです (参考)
今回のこのプリフェッチを無効にする方法があるとのことなので試してみました
環境
- macOS 11.7
- Python 3.10.2
- celery 5.2.7
- flower 1.2.0
設定ファイル
worker_prefetch_multiplier
と task_acks_late
を設定します
-
vim celeryconfig.py
result_backend = 'redis://localhost'
broker_url = 'redis://localhost'
worker_prefetch_multiplier = 1
task_acks_late = True
タスク
プリフェッチされているかを確認するためにあえて wait を入れています
-
vim tasks.py
from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
import time
time.sleep(3600)
return x + y
@app.task
def multi(x, y):
return x * y
メイン
-
app.py
from celery import chain
from tasks import add, multi
tasks = chain(add.s(1, 2),
multi.s(10),
multi.s(2)).apply_async()
動作確認
-
pipenv run celery -A tasks worker --loglevel=info
ワーカーを起動しいくつかのジョブを登録します
-
for i in `seq 0 5`; do pipenv run python app.py; done
すると4つのジョブはSTARTEDになりますが残り2つはプリフェッチされず RECEIVED にもならないことが確認できます
ちょっと流れを解説
redis 内で管理されているキーの説明を少しします
STARTED になったタスクは unacked と unacked_index というキーで管理されています
それぞれ Hash 型と ZSet 型で管理されています
まだどのワーカーにも属していないタスクは celery というキー (キュー) 内で管理されています
celery は List 側になっています
最後に
注意点としては worker_prefetch_multiplier が 3,4 系では使えない点です (4.2 から使えるようになった?)
詳細は不明ですが素直に5系にアップグレードしてから使ったほうが良いかなと思います
プリフェッチすらされていないジョブのタイムアウトなどあるのか気になりました
0 件のコメント:
コメントを投稿