概要
今回は celery の inspect という機能を使って登録済みのジョブ情報を取得してみます
inspect を使う場合のポイントはワーカー (ノード) が稼働していないと取得できないという点です
環境
- macOS 11.5.2
- Python 3.8.3
- Celery 4.4.7
タスクスクリプト
- vim sub_stasks.py
import time
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@app.task
def add(x, y):
time.sleep(10)
return x + y
実行メインスクリプト
- vim main.py
from sub_tasks import add
add.delay(100, 1)
inspect スクリプト
今回の肝になるスクリプトです
以下 4 つの inspect 機能を使っています
- registered・・・登録済みのタスクを取得
- scheduled・・・スケジュール済みのタスクを取得
- active・・・実行中のジョブを取得
- reserved・・・受信済みのジョブを取得
上の 2 つはタスクの情報を取得する inspector になります
下の 2 つがジョブの状態を確認するための inspector になります
- vim job_inspector.py
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
inspector = app.control.inspect()
registerd = inspector.registered()
for node, tasks in registerd.items():
print(node)
print(tasks)
scheduled = inspector.scheduled()
for node, tasks in registerd.items():
print(node)
print(tasks)
active = inspector.active()
for node, jobs in active.items():
print(node)
for job in jobs:
for k, v in job.items():
print(" {} -> {}".format(k, v))
print("")
reserved = inspector.reserved()
for node, jobs in reserved.items():
print(node)
for job in jobs:
for k, v in job.items():
print(" {} -> {}".format(k, v))
print("")
今回はわかりやすいように print を使っているだけになります
基本的には各ノードごとにジョブやタスクの情報がリストで取得できるのでループして中の情報を確認します
動作確認
先にワーカーを起動しておきましょう
ワーカーが起動していないと inspector はすべて None を返してしまいます
ジョブ登録
-
for i in `seq 0 10`; do pipenv run python main.py; done;
ワーカー起動
- pipenv run celery -A sub_tasks worker --loglevel=info
inspector 起動
- pipenv run python job_inspector.py
ワーカーが起動していればちゃんとタスクやジョブの情報が出力されると思います
実行待ちは reserved 側に表示されるのが確認できると思います
最後に
ワーカーが動作していない状態で登録済みのジョブの状態を確認したい場合は直接 redis などのブローカの中を確認するしかないかもしれません
おまけ: キューを指定している場合は
inspector は特にやることは変わりません 取得したジョブ情報にある routing_key にキュー名が入るようになります
{
'exchange': '',
'routing_key': 'hoge',
'priority': 0,
'redelivered': None
}
sub_tasks.py では以下のようにキューをしている想定です
- vim sub_tasks.py
import time
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@app.task(queue="hoge")
def add(x, y):
time.sleep(10)
return x + y
- pipenv run celery -A sub_tasks worker --loglevel=info --queues=hoge
0 件のコメント:
コメントを投稿