2021年5月20日木曜日

celery のタスクでクラスとして作成する方法

celery のタスクでクラスとして作成する方法

概要

過去に紹介してきた celery の使い方はすべて app を使ってタスクを定義してきました クラスベースで作成するといろいろと都合の良いことがあるので今回はクラスベースの celery タスクを作成する方法を紹介します

環境

  • macOS 11.3.1
  • Python 3.8.7
    • celery 5.0.5

インストール

今回は broker に redis を使うので redis ライブラリもインストールします

  • pipenv install celery redis
  • brew services start redis

celery オブジェクトの管理と生成

celery オブジェクトを管理するモジュールを作成します タスクを管理するパッケージとは別のパッケージにしたほう良いかなと思います

  • vim proj/my_celery.py
from celery import Celery

my_celery_app = Celery('test', broker='redis://localhost')

celery 自体の設定はここで行います

タスク作成

my_celery_app を元にタスクを作成します my_celery_app.Task を継承したクラスを作成しそこに run メソッドを実装します

今回は run しか実装していませんがエラーハンドリングなどを行う on_failure メソッドなどを実装すればエラーハンドリングのカスタマイズも可能です

  • vim tasks/user_task.py
from proj.my_celery import my_celery_app
from celery.log import get_default_logger

class UserTask(my_celery_app.Task):

    def __init__(self, default_username="hawk", default_age=10):
        self.name = "user_task" # required
        self.default_username = default_username
        self.default_age = default_age
        self.default_mail = "no-reply@mail.domain"
        self.logger = get_default_logger(__name__)

    def run(self, *args, **kwargs):
        self.logger.info(kwargs.get("username", self.default_username))
        self.logger.info(kwargs.get("age", self.default_age))
        self.logger.info(kwargs.get("mail", self.default_mail))

ポイント

いくつかポイントがあります まず name フィールドは必須のフィールドになります タスクとして登録し動作する際の名前になるので一意になるようなしましょう

また同じようにコンストラクタで設定するフィールド名の指定は celery.Task クラスの引数名と被らないようにしましょう https://docs.celeryproject.org/en/stable/reference/celery.app.task.html

カスタムタスククラスのコンストラクタでフィールドのデフォルト値を指定できます あとでタスクを実行しますがタスク実行時にもコンストラクタを呼べるのですがフィールドを変更することはできませんでした なのであくまでもデフォルト値として使うのが良いかなと思います

run は args と kwargs を引数に持ちましょう こうすることでタスクが呼ばれた際に指定された引数を受け取ることができるようになります

タスクの登録

作成した UserTask を登録します 今回はタスクをまとめて登録する専用ののモジュールを作成します

  • vim tasks/tasks.py
from proj.my_celery import my_celery_app
from tasks.user_task import UserTask

my_celery_app.tasks.register(UserTask())

my_celery_app.tasks.register を使って登録します ここで UserTask のコンストラクタが呼ばれるのでここでデフォルト値を上書きしても OK です

タスクの起動

ではタスクを起動しましょう

  • pipenv run celery -A tasks.tasks worker -l info

これで登録したすべてのタスクが起動できます キューごとにタスクを分けたい場合は tasks/tasks.py をキューごとに分割して起動すると良いかなと思います

動作確認

REPL で確認します UserTask を生成し delay メソッドなどで呼び出せば OK です 今回の場合だと UserTask 生成時の引数はすべて無視されます 値を変更したい場合は delay などのキーワード引数を使って書き換えます

  • pipenv run python3
>>> from tasks.user_task import UserTask

>>> UserTask().delay()
<AsyncResult: 9db21ce1-d63b-4e9a-88a2-0023e139fb11>

>>> UserTask().delay(username="user01", age=20, mail="user01@mail.domain")
<AsyncResult: 6f8dc3a1-18ad-4807-ad21-be3652cab1e1>
[2021-05-19 15:14:33,938: INFO/MainProcess] Received task: user_task[9db21ce1-d63b-4e9a-88a2-0023e139fb11]  
[2021-05-19 15:14:33,941: INFO/ForkPoolWorker-2] hawk
[2021-05-19 15:14:33,942: INFO/ForkPoolWorker-2] 10
[2021-05-19 15:14:33,942: INFO/ForkPoolWorker-2] no-reply@mail.domain
[2021-05-19 15:14:33,943: INFO/ForkPoolWorker-2] Task user_task[9db21ce1-d63b-4e9a-88a2-0023e139fb11] succeeded in 0.002128031999999891s: None

[2021-05-19 15:32:24,838: INFO/MainProcess] Received task: user_task[6f8dc3a1-18ad-4807-ad21-be3652cab1e1]  
[2021-05-19 15:32:24,841: INFO/ForkPoolWorker-2] user01
[2021-05-19 15:32:24,842: INFO/ForkPoolWorker-2] 20
[2021-05-19 15:32:24,843: INFO/ForkPoolWorker-2] user01@mail.domain
[2021-05-19 15:32:24,844: INFO/ForkPoolWorker-2] Task user_task[6f8dc3a1-18ad-4807-ad21-be3652cab1e1] succeeded in 0.0032719819999442734s: None

最後に

タスクが増えるとクラスが増えるのでそれが面倒な人は app.task デコレータなどを使ってどんどんタスクを定義するのが良いかなと思います

参考サイト

0 件のコメント:

コメントを投稿