2022年10月7日金曜日

Celeryでプリフェッチ(RECEIVED)されないための設定

Celeryでプリフェッチ(RECEIVED)されないための設定

概要

celery にはプリフェッチという機能がありワーカーが実行中であっても次のジョブを事前に取得して RECEIVED として管理する機能があります
RECEIVED になったジョブはワーカーが保持するすべてのジョブが終了するまで実行されないので長い処理を実行中のワーカーにプリフェッチされると RECEIVED で永遠に実行されないジョブが発生してしまいます
また RECEIVED の状態で1時間経過すると再度ジョブをエンキューするという謎の仕様がありこのせいで同一ジョブが複数回実行されるというバグもあるようです (参考)

今回のこのプリフェッチを無効にする方法があるとのことなので試してみました

環境

  • macOS 11.7
  • Python 3.10.2
  • celery 5.2.7
  • flower 1.2.0

設定ファイル

worker_prefetch_multipliertask_acks_late を設定します

  • vim celeryconfig.py
result_backend = 'redis://localhost'
broker_url = 'redis://localhost'
worker_prefetch_multiplier = 1
task_acks_late = True

タスク

プリフェッチされているかを確認するためにあえて wait を入れています

  • vim tasks.py
from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    import time
    time.sleep(3600)
    return x + y

@app.task
def multi(x, y):
    return x * y

メイン

  • app.py
from celery import chain
from tasks import add, multi

tasks = chain(add.s(1, 2),
              multi.s(10),
              multi.s(2)).apply_async()

動作確認

  • pipenv run celery -A tasks worker --loglevel=info

ワーカーを起動しいくつかのジョブを登録します

  • for i in `seq 0 5`; do pipenv run python app.py; done

すると4つのジョブはSTARTEDになりますが残り2つはプリフェッチされず RECEIVED にもならないことが確認できます

ちょっと流れを解説

redis 内で管理されているキーの説明を少しします

STARTED になったタスクは unacked と unacked_index というキーで管理されています
それぞれ Hash 型と ZSet 型で管理されています

まだどのワーカーにも属していないタスクは celery というキー (キュー) 内で管理されています
celery は List 側になっています

最後に

注意点としては worker_prefetch_multiplier が 3,4 系では使えない点です (4.2 から使えるようになった?)

詳細は不明ですが素直に5系にアップグレードしてから使ったほうが良いかなと思います

プリフェッチすらされていないジョブのタイムアウトなどあるのか気になりました

参考サイト

0 件のコメント:

コメントを投稿