概要
Flask アプリから celery の非同期タスクを呼び出した際にもトレースできるようにします
Flask、Celery で一貫して OLTP にトレース情報を送信するサンプルコードを紹介します
環境
- macOS 15.3.1
- Python 3.12.9
- flask 3.1.0
- open-telemetry 0.51b0
- celery 5.4.0
- OpenTelemetry Collector 0.121.0
- Jaeger (all in one) 1.67.0
追加インストール
前回のに追加で以下をインストールします
- pipenv install opentelemetry-instrumentation-celery celery
app.py
from celery import Celery
from flask import Flask
from opentelemetry import trace
from opentelemetry.propagate import inject
from lib.tasks import roll_dice_async
# Flask アプリの作成
app = Flask(__name__)
# Celery 設定
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0"
celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)
# OpenTelemetry の設定
tracer = trace.get_tracer("diceroller.tracer")
@app.route("/rolldice")
def roll_dice():
username = get_username()
# トレースコンテキストを取得して Celery に渡す
headers = {}
inject(headers)
roll_dice_async.apply_async(kwargs={"username": username, "headers": headers})
return "Rolling dice asynchronously!"
def get_username():
with tracer.start_as_current_span("get_username") as span:
username = "hawksnowlog"
span.set_attribute("username", username)
return username
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
lib/tasks.py
from random import randint
from opentelemetry import trace
from opentelemetry.propagate import extract
from lib.worker import celery
tracer = trace.get_tracer("diceroller.tracer")
@celery.task
def roll_dice_async(username, headers):
return roll(username, headers)
def roll(username: str, headers):
# OpenTelemetry のコンテキストを受け取り、トレースを継続
with tracer.start_as_current_span("roll", context=extract(headers)) as span:
res = randint(1, 6)
span.set_attribute("roll.value", res)
return res
lib/worker.py
from celery import Celery
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Celery インスタンスを作成
celery = Celery("tasks", broker="redis://localhost:6379/0")
celery.conf.update(broker_connection_retry_on_startup=True)
# OpenTelemetry の設定
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
# Celery の OpenTelemetry を有効化
CeleryInstrumentor().instrument()
起動
Flask
-
export OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true && pipenv run opentelemetry-instrument --logs_exporter otlp --service_name dice-server python app.py
Celery
-
export OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true && pipenv run opentelemetry-instrument --logs_exporter otlp --service_name dice-server celery -A lib.tasks worker --loglevel=INFO
Jaeger
-
docker run --rm --name jaeger -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 -p 16686:16686 -p 4317:4317 -p 4318:4318 -p 14250:14250 -p 14268:14268 -p 14269:14269 -p 9411:9411 jaegertracing/all-in-one:latest
動作確認
localhost:8080/rolldice にアクセスして Jaeger にアクセスすると一つのトレース内に Flask と Celery のタスクが処理が含まれていることが確認できると思います
最後に
Flask + Celery に OLTP のトレースを設定する方法を紹介しました
トレースに間があるのは非同期特有なのかもしれません
もしくは BatchSpanProcessor を使っているのでリアルタイムでなくバッチ処理として送信している影響かもしれません
0 件のコメント:
コメントを投稿