2020年6月7日日曜日

Celery で chain 実行したタスクの状態を確認する方法

概要

celery で chain を使った際の親子タスクの状態を確認する方法を紹介します

環境

  • macOS 10.15.5
  • Python 3.7.3
    • celery 4.4.3

ポイント

  • タスクの状態を確認するタスクを登録する
  • AsyncResult.as_tuple() を使ってすべてのタスク情報を返却する

タスク作成

まずタスクを作成します
ここで1つ目のポイントですがタスクの状態を確認するのに AsyncResult というクラスを使います
これに task_id を指定することでタスクを生成し状況を確認するのですが同一の celery オブジェクトから作成した AsyncResult でないとうまくタスクが生成できませんでした
なので「指定したタスクの状態を確認するタスク」として登録します

  • vim sub_tasks.py
import time
from celery import Celery
from celery.result import AsyncResult

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task(bind=True)
def check(self, req_id):
    task = self.AsyncResult(req_id)
    return (task.id, task.state, task.info, task.name)

@app.task
def add(x, y):
    time.sleep(10)
    return x + y

@app.task
def multi(x, y):
    return x * y

また celery.result.AsyncResult は指定された tasks_id が存在しない task_id でもエラーなどにはなりません
task.state もまだ実行していない状態と同じ PENDING が返ってきます
なのでもし存在しないタスク ID かどうかを調べる場合には tasks.state 以外に task.infotask.name など他の要素も使って判断する必要がありそうです
※ただいろいろ調べた結果現状だと判断する要素はないようです (参考1, 参考2)

add, multi は今回実行するタスクです

実行メインスクリプト

作成したタスクを実行するスクリプトです
chain の結果から親子関係のタスク情報を as_tuple() で取得してすべてのタスク情報を返却します
なお as_tuple() が謎の入れ子になっているので必要なタスク ID の情報だけ取得して返却してます

  • vim chain_test.py
import json
from celery import chain
from sub_tasks import add, multi

tasks = chain(
    add.s(1, 2),
    add.s(4),
    multi.s(10)
).apply_async()

ret = tasks.as_tuple()
print("%s,%s,%s" % (ret[0][0], ret[0][1][0][0], ret[0][1][0][1][0][0]))

ステータス確認スクリプト

あとは取得したタスク ID を使ってチェックするスクリプトを実行します

  • vim chain_check.py
import sys
from sub_tasks import check

state = check.delay(sys.argv[1]).get()

print(state)

動作確認

まずワーカーを起動します

  • pipenv run celery -A sub_tasks worker -l info

次にメインスクリプトを実行します

  • pipenv run python3 chain_test.py

すると cb6c0264-6d8f-4105-ae4d-e2d8c21cb9c1,a61b8f1d-1c82-4b04-b354-f39e80f66ffc,a5c59713-7579-4155-befd-917df16d65e7 親子関係のタスク情報が返ってくるのでこれを使って状態を確認します
右に行くほど一番最初に実行される上位のタスク ID になります

例えば最上位のタスクの状態を調べたい場合は以下のように実行します

  • pipenv run python3 chain_check.py a5c59713-7579-4155-befd-917df16d65e7

=> ['a5c59713-7579-4155-befd-917df16d65e7', 'SUCCESS', 3, None]

最後に

Celery でタスクの状態を取得する方法を試してみました
今回は chain から各種タスクの状態を取得しましたが group や chord でも同じようにできるはずです
GroupResult というクラスもあるのでこちらも使えると思います

Tips

celery オブジェクト作成時に backend= を 指定しないと AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for' が発生します

参考サイト

0 件のコメント:

コメントを投稿