2020年12月9日水曜日

Celery で flask のリクエスト ID を出力する方法

概要

前回 flask で request-id を出力する方法を紹介しました
今回は生成した request-id をバックグラウンドで動作する celery でも出力する方法を紹介します

環境

  • macOS 10.15.7
  • Python 3.8.5
    • flask 1.1.2
    • flask-log-request-id 0.10.1
    • celery 5.0.3

サンプルアプリとワーカー

  • vim app.py
import logging
from celery import Celery
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger, after_setup_logger
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id
from flask_log_request_id.extras.celery import enable_request_id_propagation

# flask 用の celery 作成
def make_celery(app):
    celery = Celery(
        app.import_name,
        backend='redis://localhost:6379',
        broker='redis://localhost:6379'
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

# ロギングハンドラ作成
def make_handler():
    handler = logging.StreamHandler()
    handler.addFilter(RequestIDLogFilter())
    handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - level=%(levelname)s - request_id=%(request_id)s - %(message)s"))
    return handler

# flask アプリ作成
app = Flask(__name__)
RequestID(app)
logging.getLogger().addHandler(make_handler())
app.logger.setLevel(logging.INFO)

my_celery = make_celery(app)
enable_request_id_propagation(my_celery)

@app.route('/')
def hello():
    ret = add.delay(100, 1).get()
    app.logger.info(ret)
    return 'ok'

# celery ログ設定
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

# celery タスク
@my_celery.task
def add(x, y):
    logger = get_task_logger(__name__)
    logging.info("add")
    return x + y

アプリ起動

  • FLASK_APP=app.py pipenv run celery -A app.my_celery worker --loglevel=info 1>/dev/null

    ワーカー起動

  • FLASK_APP=app.py pipenv run flask run 1>/dev/null

サンプルログ

2020-12-07 10:39:07,019 - celery.app.trace - level=INFO - request_id=100e1cfc-a7a2-44b6-9530-572b3313ddc0 - Task app.add[1ca2cdcc-80f8-4e16-8037-a6d30c9eca1d] succeeded in 0.011482631999996329s: 101

current_request_id が動作していないっぽい

ワーカ側で試しに current_request_id() をコールしてみたところ常に None が返ってきました
しかし celery.app.trace のログにはちゃんと request_id が乗ってきていることから celery 側で参照はできるようになっているものの current_request_id だけはうまく動作しないような状況なのかもしれません

追記: 回避策

一応あったので紹介します
自分で before_task_publish を使って header にリクエスト ID を設定する感じです
これで current_task からリクエスト ID が辿れるようになります
また request_id が null でかつ x_request_id が extra で指定されている場合には既存の request_id フィールドを x_request_id フィールドで上書きする対応もしています

import logging
from celery import Celery, current_task
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger, after_setup_logger, before_task_publish
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id
from flask_log_request_id.extras.celery import enable_request_id_propagation
from pythonjsonlogger import jsonlogger

# flask 用の celery 作成
def make_celery(app):
    celery = Celery(
        app.import_name,
        backend='redis://localhost:6379',
        broker='redis://localhost:6379',
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

# ロギングハンドラ作成
class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
        if not log_record.get('request_id') and log_record.get('x_request_id'):
            log_record['request_id'] = log_record.get('x_request_id')
            del log_record['x_request_id']

def make_handler():
    handler = logging.StreamHandler()
    handler.addFilter(RequestIDLogFilter())
    formatter = CustomJsonFormatter("%(asctime)s %(name)s %(levelname)s %(request_id)s %(message)s")
    handler.setFormatter(formatter)
    return handler

# flask アプリ作成
app = Flask(__name__)
RequestID(app)
logging.getLogger().addHandler(make_handler())
app.logger.setLevel(logging.INFO)

@app.route('/')
def hello():
    ret = add.delay(100, 1).get()
    app.logger.info(ret, extra={"username":"hawksnowlog"})
    return str(ret)

# celery 作成
celery = make_celery(app)
logger = get_task_logger(__name__)
enable_request_id_propagation(celery)
_CELERY_X_HEADER = 'x_request_id'

# celery ログ設定
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

# celery タスク
@before_task_publish.connect
def insert_request_id_header(headers, **kwargs):
    if _CELERY_X_HEADER not in headers:
        request_id = current_request_id()
        headers[_CELERY_X_HEADER] = request_id

@celery.task
def add(x, y):
    req_id = current_task.request.get(_CELERY_X_HEADER, None)
    logger.info("success add task", extra={'x_request_id':req_id})
    return x + y

あまり良い方法ではないかなと感じているので参考まで

参考サイト

0 件のコメント:

コメントを投稿