2025年3月13日木曜日

OpenTelemetry で celery の非同期タスクでもリクエストと同じトレースをする方法

OpenTelemetry で celery の非同期タスクでもリクエストと同じトレースをする方法

概要

Flask アプリから celery の非同期タスクを呼び出した際にもトレースできるようにします
Flask、Celery で一貫して OLTP にトレース情報を送信するサンプルコードを紹介します

環境

  • macOS 15.3.1
  • Python 3.12.9
    • flask 3.1.0
    • open-telemetry 0.51b0
    • celery 5.4.0
  • OpenTelemetry Collector 0.121.0
  • Jaeger (all in one) 1.67.0

追加インストール

前回のに追加で以下をインストールします

  • pipenv install opentelemetry-instrumentation-celery celery

app.py

from celery import Celery
from flask import Flask
from opentelemetry import trace
from opentelemetry.propagate import inject

from lib.tasks import roll_dice_async

# Flask アプリの作成
app = Flask(__name__)

# Celery 設定
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0"
celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)

# OpenTelemetry の設定
tracer = trace.get_tracer("diceroller.tracer")


@app.route("/rolldice")
def roll_dice():
    username = get_username()

    # トレースコンテキストを取得して Celery に渡す
    headers = {}
    inject(headers)
    roll_dice_async.apply_async(kwargs={"username": username, "headers": headers})

    return "Rolling dice asynchronously!"


def get_username():
    with tracer.start_as_current_span("get_username") as span:
        username = "hawksnowlog"
        span.set_attribute("username", username)
        return username


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)

lib/tasks.py

from random import randint

from opentelemetry import trace
from opentelemetry.propagate import extract

from lib.worker import celery

tracer = trace.get_tracer("diceroller.tracer")


@celery.task
def roll_dice_async(username, headers):
    return roll(username, headers)


def roll(username: str, headers):
    # OpenTelemetry のコンテキストを受け取り、トレースを継続
    with tracer.start_as_current_span("roll", context=extract(headers)) as span:
        res = randint(1, 6)
        span.set_attribute("roll.value", res)
        return res

lib/worker.py

from celery import Celery
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Celery インスタンスを作成
celery = Celery("tasks", broker="redis://localhost:6379/0")
celery.conf.update(broker_connection_retry_on_startup=True)

# OpenTelemetry の設定
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)

# Celery の OpenTelemetry を有効化
CeleryInstrumentor().instrument()

起動

Flask

  • export OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true && pipenv run opentelemetry-instrument --logs_exporter otlp --service_name dice-server python app.py

Celery

  • export OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true && pipenv run opentelemetry-instrument --logs_exporter otlp --service_name dice-server celery -A lib.tasks worker --loglevel=INFO

Jaeger

  • docker run --rm --name jaeger -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 -p 16686:16686 -p 4317:4317 -p 4318:4318 -p 14250:14250 -p 14268:14268 -p 14269:14269 -p 9411:9411 jaegertracing/all-in-one:latest

動作確認

localhost:8080/rolldice にアクセスして Jaeger にアクセスすると一つのトレース内に Flask と Celery のタスクが処理が含まれていることが確認できると思います

最後に

Flask + Celery に OLTP のトレースを設定する方法を紹介しました

トレースに間があるのは非同期特有なのかもしれません
もしくは BatchSpanProcessor を使っているのでリアルタイムでなくバッチ処理として送信している影響かもしれません

参考サイト

0 件のコメント:

コメントを投稿