概要
Celery (celery/celery) は Python 製のジョブキューです
ワーカーを作成することで簡単に非同期処理を実現できます
今回はインストールから簡単なワーカーを作成して動作確認をしてみました
環境
- macOS 10.15.5
- Python 3.7.3
- celery 4.4.2
celery のインストール
pipenv install celery
pipenv install redis
今回はメッセージのやり取りを行うブローカに redis を使うので redis ライブラリもインストールします
ワーカーの作成
2 つの数字を受け取りをそれを足し合わせる単純なワーカーを作成します
redis の URL に関しては必要であれば適宜変更してください
vim calculator.py
from celery import Celery
app = Celery('calculator', broker='redis://localhost')
@app.task
def add(x, y):
return x + y
ワーカー起動
celery -A calculator worker --loglevel=info
-A
オプションにはワーカーのファイル名 (モジュール名) を指定します
redis にうまく接続できれば以下のようなログが表示されワーカーが起動します
ブローカにメッセージを投入して動作確認
vim test.py
from calculator import add
add.delay(100, 1)
これを実行してワーカが動作するか確認します
pipenv run python test.py
以下のような感じで add メソッドがコールされればちゃんとちゃんとワーカ側がメッセージを受け取っていることになります
ちょっと応用: ワーカ側の処理が成功した場合に特定の処理をさせる
ワーカ側を実装する際に使う Task というクラスは様々なメソッドを持っています
例えば on_success
や on_failure
というコールバックメソッドをワーカーが実装するとワーカ側の処理が成功した際に実装したコールバックを実行することができます
以下のように calculator.py
を書き換えてみましょう
from celery import Celery, Task
app = Celery('calculator', broker='redis://localhost')
class MyCalculator(Task):
def on_success(self, retval, task_id, args, kwargs):
print("retval: {}".format(retval))
print("taks_id: {}".format(task_id))
print("args: {}".format(args))
print("kwargs: {}".format(kwargs))
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("exc: {}".format(exc))
print("task_id: {}".format(task_id))
print("args: {}".format(args))
print("kwargs: {}".format(kwargs))
print("einfo: {}".format(einfo))
@app.task(bind=True, base=MyCalculator)
def add(self, x, y):
# raise Exception("error")
return x + y
問題なく成功すると on_success
が呼ばれるのが確認できます
またコメントされている raise が呼ばれると on_failure
のログが表示されるのが確認できると思います
最後に
Python の celery に入門してみました
とりあえず動作するものを作成して使い方や流れを学びました
基本は Task を実装してタスクに紐づくメソッドを delay でコールする感じです
今回は紹介しませんでしたがブローカには様々なものが選択できるのでそれも嬉しい点かなと思います
キューの分散やチェインを使ってパイプライン的なことも実現できるようです
グラフという機能と組み合わせるとパイプラインを画像として可視化することもできるようです
0 件のコメント:
コメントを投稿