概要
前回 flask で request-id を出力する方法を紹介しました
今回は生成した request-id をバックグラウンドで動作する celery でも出力する方法を紹介します
環境
- macOS 10.15.7
- Python 3.8.5
- flask 1.1.2
- flask-log-request-id 0.10.1
- celery 5.0.3
サンプルアプリとワーカー
vim app.py
import logging
from celery import Celery
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger, after_setup_logger
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id
from flask_log_request_id.extras.celery import enable_request_id_propagation
# flask 用の celery 作成
def make_celery(app):
celery = Celery(
app.import_name,
backend='redis://localhost:6379',
broker='redis://localhost:6379'
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
# ロギングハンドラ作成
def make_handler():
handler = logging.StreamHandler()
handler.addFilter(RequestIDLogFilter())
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - level=%(levelname)s - request_id=%(request_id)s - %(message)s"))
return handler
# flask アプリ作成
app = Flask(__name__)
RequestID(app)
logging.getLogger().addHandler(make_handler())
app.logger.setLevel(logging.INFO)
my_celery = make_celery(app)
enable_request_id_propagation(my_celery)
@app.route('/')
def hello():
ret = add.delay(100, 1).get()
app.logger.info(ret)
return 'ok'
# celery ログ設定
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
logger.handlers = []
handler = make_handler()
logger.addHandler(handler)
@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
logger.handlers = []
handler = make_handler()
logger.addHandler(handler)
# celery タスク
@my_celery.task
def add(x, y):
logger = get_task_logger(__name__)
logging.info("add")
return x + y
アプリ起動
FLASK_APP=app.py pipenv run celery -A app.my_celery worker --loglevel=info 1>/dev/null
ワーカー起動FLASK_APP=app.py pipenv run flask run 1>/dev/null
サンプルログ
2020-12-07 10:39:07,019 - celery.app.trace - level=INFO - request_id=100e1cfc-a7a2-44b6-9530-572b3313ddc0 - Task app.add[1ca2cdcc-80f8-4e16-8037-a6d30c9eca1d] succeeded in 0.011482631999996329s: 101
current_request_id が動作していないっぽい
ワーカ側で試しに current_request_id()
をコールしてみたところ常に None が返ってきました
しかし celery.app.trace
のログにはちゃんと request_id
が乗ってきていることから celery 側で参照はできるようになっているものの current_request_id
だけはうまく動作しないような状況なのかもしれません
追記: 回避策
一応あったので紹介します
自分で before_task_publish
を使って header にリクエスト ID を設定する感じです
これで current_task
からリクエスト ID が辿れるようになります
また request_id
が null でかつ x_request_id
が extra で指定されている場合には既存の request_id
フィールドを x_request_id
フィールドで上書きする対応もしています
import logging
from celery import Celery, current_task
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger, after_setup_logger, before_task_publish
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id
from flask_log_request_id.extras.celery import enable_request_id_propagation
from pythonjsonlogger import jsonlogger
# flask 用の celery 作成
def make_celery(app):
celery = Celery(
app.import_name,
backend='redis://localhost:6379',
broker='redis://localhost:6379',
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
# ロギングハンドラ作成
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('request_id') and log_record.get('x_request_id'):
log_record['request_id'] = log_record.get('x_request_id')
del log_record['x_request_id']
def make_handler():
handler = logging.StreamHandler()
handler.addFilter(RequestIDLogFilter())
formatter = CustomJsonFormatter("%(asctime)s %(name)s %(levelname)s %(request_id)s %(message)s")
handler.setFormatter(formatter)
return handler
# flask アプリ作成
app = Flask(__name__)
RequestID(app)
logging.getLogger().addHandler(make_handler())
app.logger.setLevel(logging.INFO)
@app.route('/')
def hello():
ret = add.delay(100, 1).get()
app.logger.info(ret, extra={"username":"hawksnowlog"})
return str(ret)
# celery 作成
celery = make_celery(app)
logger = get_task_logger(__name__)
enable_request_id_propagation(celery)
_CELERY_X_HEADER = 'x_request_id'
# celery ログ設定
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
logger.handlers = []
handler = make_handler()
logger.addHandler(handler)
@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
logger.handlers = []
handler = make_handler()
logger.addHandler(handler)
# celery タスク
@before_task_publish.connect
def insert_request_id_header(headers, **kwargs):
if _CELERY_X_HEADER not in headers:
request_id = current_request_id()
headers[_CELERY_X_HEADER] = request_id
@celery.task
def add(x, y):
req_id = current_task.request.get(_CELERY_X_HEADER, None)
logger.info("success add task", extra={'x_request_id':req_id})
return x + y
あまり良い方法ではないかなと感じているので参考まで
0 件のコメント:
コメントを投稿