概要
before_task_publish
タスクが実行される前にある処理を実行することができるハンドラです
今回は簡単な使い方を紹介します
環境
- macOS 10.15.7
- Python 3.8.5
- celery 4.4.7
とりあえず使ってみる
vim sub_tasks.py
from celery import Celery
from celery.signals import before_task_publish
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@before_task_publish.connect
def before_task_publish_test(**kwargs):
print("before_task_publish")
@app.task
def add(x, y):
return x + y
pipenv run celery -A sub_tasks worker --loglevel=info
実行してみるとわかりますが before_task_publish
で表示される print 文はクライアント側で表示されます
これはデバッグ時に結構ハマりポイントなので注意しましょう
header を使ってみる
Celery のタスクには header というカスタムフィールドがあり辞書形式で値を設定することができます
例えば before_task_publish
で header に値を設定しタスク側で取得するといったことができるようになります
import datetime
from celery import Celery, current_task
from celery.signals import before_task_publish
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@before_task_publish.connect
def before_task_publish_test(headers, **kwargs):
headers['time'] = datetime.datetime.now().isoformat()
# headers['username'] = 'hawksnowlog'
@app.task
def add(x, y):
time = current_task.request.get('time', None)
print(time)
return x + y
実行メイン
vim main.py
from sub_tasks import add
add.delay(100, 1)
pipenv run python main.py
0 件のコメント:
コメントを投稿