概要
過去に紹介してきた celery の使い方はすべて app を使ってタスクを定義してきました クラスベースで作成するといろいろと都合の良いことがあるので今回はクラスベースの celery タスクを作成する方法を紹介します
環境
- macOS 11.3.1
- Python 3.8.7
- celery 5.0.5
インストール
今回は broker に redis を使うので redis ライブラリもインストールします
- pipenv install celery redis
- brew services start redis
celery オブジェクトの管理と生成
celery オブジェクトを管理するモジュールを作成します タスクを管理するパッケージとは別のパッケージにしたほう良いかなと思います
- vim proj/my_celery.py
from celery import Celery
my_celery_app = Celery('test', broker='redis://localhost')
celery 自体の設定はここで行います
タスク作成
my_celery_app を元にタスクを作成します my_celery_app.Task を継承したクラスを作成しそこに run メソッドを実装します
今回は run しか実装していませんがエラーハンドリングなどを行う on_failure メソッドなどを実装すればエラーハンドリングのカスタマイズも可能です
- vim tasks/user_task.py
from proj.my_celery import my_celery_app
from celery.log import get_default_logger
class UserTask(my_celery_app.Task):
def __init__(self, default_username="hawk", default_age=10):
self.name = "user_task" # required
self.default_username = default_username
self.default_age = default_age
self.default_mail = "no-reply@mail.domain"
self.logger = get_default_logger(__name__)
def run(self, *args, **kwargs):
self.logger.info(kwargs.get("username", self.default_username))
self.logger.info(kwargs.get("age", self.default_age))
self.logger.info(kwargs.get("mail", self.default_mail))
ポイント
いくつかポイントがあります まず name フィールドは必須のフィールドになります タスクとして登録し動作する際の名前になるので一意になるようなしましょう
また同じようにコンストラクタで設定するフィールド名の指定は celery.Task クラスの引数名と被らないようにしましょう https://docs.celeryproject.org/en/stable/reference/celery.app.task.html
カスタムタスククラスのコンストラクタでフィールドのデフォルト値を指定できます あとでタスクを実行しますがタスク実行時にもコンストラクタを呼べるのですがフィールドを変更することはできませんでした なのであくまでもデフォルト値として使うのが良いかなと思います
run は args と kwargs を引数に持ちましょう こうすることでタスクが呼ばれた際に指定された引数を受け取ることができるようになります
タスクの登録
作成した UserTask を登録します 今回はタスクをまとめて登録する専用ののモジュールを作成します
- vim tasks/tasks.py
from proj.my_celery import my_celery_app
from tasks.user_task import UserTask
my_celery_app.tasks.register(UserTask())
my_celery_app.tasks.register を使って登録します ここで UserTask のコンストラクタが呼ばれるのでここでデフォルト値を上書きしても OK です
タスクの起動
ではタスクを起動しましょう
- pipenv run celery -A tasks.tasks worker -l info
これで登録したすべてのタスクが起動できます キューごとにタスクを分けたい場合は tasks/tasks.py をキューごとに分割して起動すると良いかなと思います
動作確認
REPL で確認します UserTask を生成し delay メソッドなどで呼び出せば OK です 今回の場合だと UserTask 生成時の引数はすべて無視されます 値を変更したい場合は delay などのキーワード引数を使って書き換えます
- pipenv run python3
>>> from tasks.user_task import UserTask
>>> UserTask().delay()
<AsyncResult: 9db21ce1-d63b-4e9a-88a2-0023e139fb11>
>>> UserTask().delay(username="user01", age=20, mail="user01@mail.domain")
<AsyncResult: 6f8dc3a1-18ad-4807-ad21-be3652cab1e1>
[2021-05-19 15:14:33,938: INFO/MainProcess] Received task: user_task[9db21ce1-d63b-4e9a-88a2-0023e139fb11]
[2021-05-19 15:14:33,941: INFO/ForkPoolWorker-2] hawk
[2021-05-19 15:14:33,942: INFO/ForkPoolWorker-2] 10
[2021-05-19 15:14:33,942: INFO/ForkPoolWorker-2] no-reply@mail.domain
[2021-05-19 15:14:33,943: INFO/ForkPoolWorker-2] Task user_task[9db21ce1-d63b-4e9a-88a2-0023e139fb11] succeeded in 0.002128031999999891s: None
[2021-05-19 15:32:24,838: INFO/MainProcess] Received task: user_task[6f8dc3a1-18ad-4807-ad21-be3652cab1e1]
[2021-05-19 15:32:24,841: INFO/ForkPoolWorker-2] user01
[2021-05-19 15:32:24,842: INFO/ForkPoolWorker-2] 20
[2021-05-19 15:32:24,843: INFO/ForkPoolWorker-2] user01@mail.domain
[2021-05-19 15:32:24,844: INFO/ForkPoolWorker-2] Task user_task[6f8dc3a1-18ad-4807-ad21-be3652cab1e1] succeeded in 0.0032719819999442734s: None
最後に
タスクが増えるとクラスが増えるのでそれが面倒な人は app.task デコレータなどを使ってどんどんタスクを定義するのが良いかなと思います
0 件のコメント:
コメントを投稿