2021年5月21日金曜日

flask と celery で循環参照を避けるコツ

flask と celery で循環参照を避けるコツ

概要

Python で循環参照は避けては通れないエラーだと思います 今回は flask + celery を例に循環参照のエラーが発生した際の回避策を紹介します

環境

  • macOS 11.3.1
  • Python 3.8.7
    • flask 2.0.0
    • celery 5.0.5

循環参照する例

Controller を View ベースのクラスにし Celery のワーカーを Task ベースのクラスにした場合に素直に作成すると以下のよう感じになると思います 以下のような依存関係が発生しているためアプリを起動できません

  • controllers -> tasks に依存
  • tasks -> proj に依存
  • proj -> controllers に依存

なお起動コマンドはアプリ、ワーカーそれぞれ以下の通りです

  • pipenv run celery -A tasks.tasks worker -l info
  • FLASK_APP=proj.my_flask pipenv run flask run

proj モジュール

  • vim proj/my_flask.py
from flask import Flask

from proj.make_celery import make_celery
from controllers.user_controller import UserController

app = Flask(__name__)
app.add_url_rule('/user', view_func=UserController.as_view('user_controller'))

app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)
  • vim proj/make_celery.py
from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

controllers モジュール

  • vim controllers/user_controller.py
from flask.views import View

from tasks.user_task import UserTask

class UserController(View):

    def dispatch_request(self):
        UserTask().delay()
        return "<html><body>username: hawksnowlog</body></html>"

tasks モジュール

  • vim tasks/user_task.py
from proj.my_flask import celery, app
from celery.log import get_default_logger

class UserTask(celery.Task):

    def __init__(self, default_username="hawk", default_age=10):
        self.name = "user_task" # required
        self.default_username = default_username
        self.default_age = default_age
        self.default_mail = "no-reply@mail.domain"
        self.logger = get_default_logger(__name__)

    def run(self, *args, **kwargs):
        self.logger.info(app.config["CELERY_BROKER_URL"])
        self.logger.info(kwargs.get("username", self.default_username))
        self.logger.info(kwargs.get("age", self.default_age))
        self.logger.info(kwargs.get("mail", self.default_mail))
  • vim tasks/tasks.py
from proj.my_flask import celery
from tasks.user_task import UserTask

celery.tasks.register(UserTask())

循環参照を解消してみる

いくつかの方法で循環参照を解消してみます

成功例その1: proj と controllers の依存関係を逆にする

proj と controllers の参照を逆にすることで循環させないようにしてみます

  • controllers -> tasks に依存
  • tasks -> proj に依存
  • proj -> controllers に依存
  • controllers -> proj に依存

コードの変更箇所は以下の通りです

  • vim controllers/user_controller.py
from flask.views import View

from tasks.user_task import UserTask
from proj import app

class UserController(View):

    def dispatch_request(self):
        UserTask().delay()
        return "<html><body>username: hawksnowlog</body></html>"

app.add_url_rule('/user', view_func=UserController.as_view('user_controller'))
  • vim proj/my_flask.py
from flask import Flask

from proj.make_celery import make_celery

app = Flask(__name__)

app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)

これで依存関係は解消されてアプリは起動できますが add_url_rule が読み込まれないため意図した動作にはなりません なのでそれは起動コマンドを変更することで解消します

  • FLASK_APP=controllers.user_controller pipenv run flask run

成功例その2: tasks が proj に依存しないようにする

proj モジュールで flask アプリを管理するのではなく別のモジュール (root モジュール) で管理するようにします root モジュールからはどこへの依存も持たないため循環参照がなくなります

  • controllers -> tasks に依存
  • tasks -> proj に依存
  • tasks -> root に依存
  • proj -> controllers に依存
  • controllers -> proj に依存
  • controllers -> root に依存

コードの変更と追加箇所は以下の通りです root/make_celery.py は proj/make_celery.py を移動するだけで OK です

  • vim root/__init__.py
rom flask import Flask

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
  • vim root/make_celery.py
from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery
  • vim proj/my_flask.py
from root import app
from root.make_celery import make_celery

from controllers.user_controller import UserController

app.add_url_rule('/user', view_func=UserController.as_view('user_controller'))

celery = make_celery(app)
  • vim tasks/user_task.py
from root import app
from root.make_celery import make_celery

from celery.log import get_default_logger

celery = make_celery(app)

class UserTask(celery.Task):

    def __init__(self, default_username="hawk", default_age=10):
        self.name = "user_task" # required
        self.default_username = default_username
        self.default_age = default_age
        self.default_mail = "no-reply@mail.domain"
        self.logger = get_default_logger(__name__)

    def run(self, *args, **kwargs):
        self.logger.info(app.config["CELERY_BROKER_URL"])
        self.logger.info(kwargs.get("username", self.default_username))
        self.logger.info(kwargs.get("age", self.default_age))
        self.logger.info(kwargs.get("mail", self.default_mail))

user_task.py の celery の管理はもう少し良い方法があるかもしれません

make_celery.py は状況によっては proj 配下でも問題ないことがあります proj/__init__.py がありその中で flask の app を扱っている場合は移動したほうがよいでしょう

最後に

まとめると循環参照した場合に回避策としては

  • 参照方法を逆にする
  • どこにも参照しないモジュールとして切り出しそこを参照するようにする

あたりが回避策のポイントかなと思います

0 件のコメント:

コメントを投稿