2020年9月21日月曜日

celery.chain を monkeypatch する方法

概要

celery で chain を使っている場合に chain 自体を monkeypatch する方法を紹介します

環境

  • macOS 10.15.6
  • Python 3.8.5
    • celery 4.4.7

タスクを定義するコード

  • vim sub_tasks.py
import time
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task
def add(x, y):
    return x + y

@app.task
def multi(x, y):
    return x * y

@app.task
def total(ary):
    return sum(ary)

chain するメインのコード

  • vim chain_test.py
import celery
from sub_tasks import add, multi

def chain_test():
    tasks = celery.chain(
        add.s(1, 2),
        add.s(4),
        add.s(5),
        add.s(6),
        add.s(7),
        multi.s(10)
    )
    result = tasks.apply_async()
    return result.as_list()

テストコード

  • vim test_chain_test.py
import pytest
import sub_tasks

from celery import Celery
from chain_test import chain_test

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost', task_always_eager=True)

class DummyResult():
    def as_list(self):
        return ["hoge"]

class DummyChain():
    def apply_async(self, args=None, kwargs=None, task_id=None, **options):
        return DummyResult()

def test_chain_test(monkeypatch):
    monkeypatch.setattr('celery.chain', lambda self, *tasks, **options: DummyChain())
    monkeypatch.setattr(sub_tasks, 'app', app)
    result = chain_test()
    assert (result == ["hoge"])

ポイント

  • sub_tasks.py で定義している Celery オブジェクトを必ず上書きすること
  • 上書きする際は task_always_eager=True にすること
  • chain を呼び出しているメインの処理は import celery して celery.chain として chain を呼び出すこと
  • chain を monkeypatch して DummyChain のクラスを返却するようにすること
  • DummyChain クラスは apply_async メソッドを持ち DummyResult を返却すること

参考サイト

0 件のコメント:

コメントを投稿