2020年6月3日水曜日

Python Celery 超入門

概要

Celery (celery/celery) は Python 製のジョブキューです
ワーカーを作成することで簡単に非同期処理を実現できます
今回はインストールから簡単なワーカーを作成して動作確認をしてみました

環境

  • macOS 10.15.5
  • Python 3.7.3
    • celery 4.4.2

celery のインストール

  • pipenv install celery
  • pipenv install redis

今回はメッセージのやり取りを行うブローカに redis を使うので redis ライブラリもインストールします

ワーカーの作成

2 つの数字を受け取りをそれを足し合わせる単純なワーカーを作成します
redis の URL に関しては必要であれば適宜変更してください

  • vim calculator.py
from celery import Celery

app = Celery('calculator', broker='redis://localhost')

@app.task
def add(x, y):
    return x + y

ワーカー起動

  • celery -A calculator worker --loglevel=info

-A オプションにはワーカーのファイル名 (モジュール名) を指定します
redis にうまく接続できれば以下のようなログが表示されワーカーが起動します

[2020-06-01 16:47:32,827: INFO/MainProcess] Connected to redis://localhost:6379// [2020-06-01 16:47:32,836: INFO/MainProcess] mingle: searching for neighbors [2020-06-01 16:47:33,862: INFO/MainProcess] mingle: all alone [2020-06-01 16:47:33,882: INFO/MainProcess] celery@localhost ready.

ブローカにメッセージを投入して動作確認

  • vim test.py
from calculator import add

add.delay(100, 1)

これを実行してワーカが動作するか確認します

  • pipenv run python test.py

以下のような感じで add メソッドがコールされればちゃんとちゃんとワーカ側がメッセージを受け取っていることになります

[2020-06-01 16:52:44,236: INFO/MainProcess] Received task: calculator.add[2dca977e-61e7-4080-b79e-a21ecae3e1b0] [2020-06-01 16:52:44,239: INFO/ForkPoolWorker-2] Task calculator.add[2dca977e-61e7-4080-b79e-a21ecae3e1b0] succeeded in 0.0004914209999924424s: 101

ちょっと応用: ワーカ側の処理が成功した場合に特定の処理をさせる

ワーカ側を実装する際に使う Task というクラスは様々なメソッドを持っています
例えば on_successon_failure というコールバックメソッドをワーカーが実装するとワーカ側の処理が成功した際に実装したコールバックを実行することができます

以下のように calculator.py を書き換えてみましょう

from celery import Celery, Task

app = Celery('calculator', broker='redis://localhost')

class MyCalculator(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print("retval: {}".format(retval))
        print("taks_id: {}".format(task_id))
        print("args: {}".format(args))
        print("kwargs: {}".format(kwargs))

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("exc: {}".format(exc))
        print("task_id: {}".format(task_id))
        print("args: {}".format(args))
        print("kwargs: {}".format(kwargs))
        print("einfo: {}".format(einfo))

@app.task(bind=True, base=MyCalculator)
def add(self, x, y):
    # raise Exception("error")
    return x + y

問題なく成功すると on_success が呼ばれるのが確認できます
またコメントされている raise が呼ばれると on_failure のログが表示されるのが確認できると思います

最後に

Python の celery に入門してみました
とりあえず動作するものを作成して使い方や流れを学びました
基本は Task を実装してタスクに紐づくメソッドを delay でコールする感じです

今回は紹介しませんでしたがブローカには様々なものが選択できるのでそれも嬉しい点かなと思います
キューの分散やチェインを使ってパイプライン的なことも実現できるようです
グラフという機能と組み合わせるとパイプラインを画像として可視化することもできるようです

参考サイト

0 件のコメント:

コメントを投稿