2020年12月11日金曜日

Celery の before_task_publish を使ってみた

概要

before_task_publish タスクが実行される前にある処理を実行することができるハンドラです
今回は簡単な使い方を紹介します

環境

  • macOS 10.15.7
  • Python 3.8.5
    • celery 4.4.7

とりあえず使ってみる

  • vim sub_tasks.py
from celery import Celery
from celery.signals import before_task_publish

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@before_task_publish.connect
def before_task_publish_test(**kwargs):
    print("before_task_publish")

@app.task
def add(x, y):
    return x + y
  • pipenv run celery -A sub_tasks worker --loglevel=info

実行してみるとわかりますが before_task_publish で表示される print 文はクライアント側で表示されます
これはデバッグ時に結構ハマりポイントなので注意しましょう

header を使ってみる

Celery のタスクには header というカスタムフィールドがあり辞書形式で値を設定することができます
例えば before_task_publish で header に値を設定しタスク側で取得するといったことができるようになります

import datetime

from celery import Celery, current_task
from celery.signals import before_task_publish

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@before_task_publish.connect
def before_task_publish_test(headers, **kwargs):
    headers['time'] = datetime.datetime.now().isoformat()
    # headers['username'] = 'hawksnowlog'

@app.task
def add(x, y):
    time = current_task.request.get('time', None)
    print(time)
    return x + y

実行メイン

  • vim main.py
from sub_tasks import add

add.delay(100, 1)
  • pipenv run python main.py

参考サイト

0 件のコメント:

コメントを投稿