概要
celery で chain を使った際の親子タスクの状態を確認する方法を紹介します
環境
- macOS 10.15.5
- Python 3.7.3
- celery 4.4.3
ポイント
- タスクの状態を確認するタスクを登録する
AsyncResult.as_tuple()
を使ってすべてのタスク情報を返却する
タスク作成
まずタスクを作成します
ここで1つ目のポイントですがタスクの状態を確認するのに AsyncResult というクラスを使います
これに task_id
を指定することでタスクを生成し状況を確認するのですが同一の celery オブジェクトから作成した AsyncResult でないとうまくタスクが生成できませんでした
なので「指定したタスクの状態を確認するタスク」として登録します
vim sub_tasks.py
import time
from celery import Celery
from celery.result import AsyncResult
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@app.task(bind=True)
def check(self, req_id):
task = self.AsyncResult(req_id)
return (task.id, task.state, task.info, task.name)
@app.task
def add(x, y):
time.sleep(10)
return x + y
@app.task
def multi(x, y):
return x * y
また celery.result.AsyncResult
は指定された tasks_id
が存在しない task_id
でもエラーなどにはなりません
task.state
もまだ実行していない状態と同じ PENDING
が返ってきます
なのでもし存在しないタスク ID かどうかを調べる場合には tasks.state
以外に task.info
や task.name
など他の要素も使って判断する必要がありそうです
※ただいろいろ調べた結果現状だと判断する要素はないようです (参考1, 参考2)
add, multi は今回実行するタスクです
実行メインスクリプト
作成したタスクを実行するスクリプトです
chain の結果から親子関係のタスク情報を as_tuple()
で取得してすべてのタスク情報を返却します
なお as_tuple()
が謎の入れ子になっているので必要なタスク ID の情報だけ取得して返却してます
vim chain_test.py
import json
from celery import chain
from sub_tasks import add, multi
tasks = chain(
add.s(1, 2),
add.s(4),
multi.s(10)
).apply_async()
ret = tasks.as_tuple()
print("%s,%s,%s" % (ret[0][0], ret[0][1][0][0], ret[0][1][0][1][0][0]))
ステータス確認スクリプト
あとは取得したタスク ID を使ってチェックするスクリプトを実行します
vim chain_check.py
import sys
from sub_tasks import check
state = check.delay(sys.argv[1]).get()
print(state)
動作確認
まずワーカーを起動します
pipenv run celery -A sub_tasks worker -l info
次にメインスクリプトを実行します
pipenv run python3 chain_test.py
すると cb6c0264-6d8f-4105-ae4d-e2d8c21cb9c1,a61b8f1d-1c82-4b04-b354-f39e80f66ffc,a5c59713-7579-4155-befd-917df16d65e7
親子関係のタスク情報が返ってくるのでこれを使って状態を確認します
右に行くほど一番最初に実行される上位のタスク ID になります
例えば最上位のタスクの状態を調べたい場合は以下のように実行します
pipenv run python3 chain_check.py a5c59713-7579-4155-befd-917df16d65e7
=> ['a5c59713-7579-4155-befd-917df16d65e7', 'SUCCESS', 3, None]
最後に
Celery でタスクの状態を取得する方法を試してみました
今回は chain から各種タスクの状態を取得しましたが group や chord でも同じようにできるはずです
GroupResult というクラスもあるのでこちらも使えると思います
Tips
celery オブジェクト作成時に backend=
を 指定しないと AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
が発生します
0 件のコメント:
コメントを投稿