2021年8月24日火曜日

celery で登録済みのジョブの状態を確認する方法

celery で登録済みのジョブの状態を確認する方法

概要

今回は celery の inspect という機能を使って登録済みのジョブ情報を取得してみます
inspect を使う場合のポイントはワーカー (ノード) が稼働していないと取得できないという点です

環境

  • macOS 11.5.2
  • Python 3.8.3
  • Celery 4.4.7

タスクスクリプト

  • vim sub_stasks.py
import time
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task
def add(x, y):
    time.sleep(10)
    return x + y

実行メインスクリプト

from sub_tasks import add

add.delay(100, 1)

inspect スクリプト

今回の肝になるスクリプトです
以下 4 つの inspect 機能を使っています

  • registered・・・登録済みのタスクを取得
  • scheduled・・・スケジュール済みのタスクを取得
  • active・・・実行中のジョブを取得
  • reserved・・・受信済みのジョブを取得

上の 2 つはタスクの情報を取得する inspector になります
下の 2 つがジョブの状態を確認するための inspector になります

  • vim job_inspector.py
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

inspector = app.control.inspect()

registerd = inspector.registered()
for node, tasks in registerd.items():
    print(node)
    print(tasks)

scheduled = inspector.scheduled()
for node, tasks in registerd.items():
    print(node)
    print(tasks)

active = inspector.active()
for node, jobs in active.items():
    print(node)
    for job in jobs:
        for k, v in job.items():
            print("    {} -> {}".format(k, v))
        print("")

reserved = inspector.reserved()
for node, jobs in reserved.items():
    print(node)
    for job in jobs:
        for k, v in job.items():
            print("    {} -> {}".format(k, v))
        print("")

今回はわかりやすいように print を使っているだけになります
基本的には各ノードごとにジョブやタスクの情報がリストで取得できるのでループして中の情報を確認します

動作確認

先にワーカーを起動しておきましょう
ワーカーが起動していないと inspector はすべて None を返してしまいます

ジョブ登録

  • for i in `seq 0 10`; do pipenv run python main.py; done;

ワーカー起動

  • pipenv run celery -A sub_tasks worker --loglevel=info

inspector 起動

  • pipenv run python job_inspector.py

ワーカーが起動していればちゃんとタスクやジョブの情報が出力されると思います
実行待ちは reserved 側に表示されるのが確認できると思います

最後に

ワーカーが動作していない状態で登録済みのジョブの状態を確認したい場合は直接 redis などのブローカの中を確認するしかないかもしれません

おまけ: キューを指定している場合は

inspector は特にやることは変わりません 取得したジョブ情報にある routing_key にキュー名が入るようになります

{
  'exchange': '',
  'routing_key': 'hoge',
  'priority': 0,
  'redelivered': None
}

sub_tasks.py では以下のようにキューをしている想定です

  • vim sub_tasks.py
import time
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task(queue="hoge")
def add(x, y):
    time.sleep(10)
    return x + y
  • pipenv run celery -A sub_tasks worker --loglevel=info --queues=hoge

参考サイト

0 件のコメント:

コメントを投稿