Skip to content

[BUG] Azure Core does NOT perform Open Telemetry context propagation as documented #44216

Open
@jmferland

Description

@jmferland

Describe the bug
azure-messaging-servicebus and azure-core-tracing-opentelemetry do NOT support Open Telemetry Context Propagation although the Open Telemetry documentation claims Azure Core does:

Image

"Context propagation" is not limited to "trace context propagation." It can include "baggage propagation", or whatever else is configured (e.g. via system property otel.propagators).

Typically, instrumentations use OpenTelemetry.getPropagators(), NOT W3CTraceContextPropagator directly. Many other instrumented java libraries and frameworks support "context propagation" (see code search for GlobalOpenTelemetry.getPropagators()).

Exception or Stack Trace
None.

To Reproduce

  1. Prepare an Azure Service Bus Queue (that no one else is using).
  2. Set the environment variables SERVICE_BUS_CONNECTION_STRING and SERVICE_BUS_QUEUE_NAME accordingly.
  3. Run ServiceBusOtelContextPropagationTest.

Code Snippet

ServiceBusOtelContextPropagationTest.kt

internal class ServiceBusOtelContextPropagationTest {
    private val queueName: String = System.getenv("SERVICE_BUS_QUEUE_NAME")
    private val connectionString: String = System.getenv("SERVICE_BUS_CONNECTION_STRING")

    private val traceId = TraceId.fromLongs(0x000000008badf00d, 0x0000deadbeefd00d)
    private val spanId = SpanId.fromLong(0x000000ace0fba53)
    private val baggageAsMap =
        mapOf(
            "Bilbo" to "Baggins",
            "Samwise" to "Gamgee",
        )

    private val expectedApplicationProperties =
        mapOf(
            "baggage" to "Bilbo=Baggins,Samwise=Gamgee",
            "traceparent" to "00-$traceId-$spanId-01",
        )

    private val clientBuilder = ServiceBusClientBuilder().connectionString(connectionString)

    data class RelevantServiceBusMessageParts(
        val traceId: String,
        val spanId: String,
        val baggageAsMap: Map<String, String>,
        val applicationProperties: Map<String, Any>,
    )

    @Test
    fun `expectedApplicationProperties should be the same as what the OTel ContextPropagators inject`() {
        val otelInjectedProperties =
            execInOtelContext {
                val carrier = mutableMapOf<String, Any>()
                GlobalOpenTelemetry.getPropagators().textMapPropagator.inject(
                    Context.current(),
                    null,
                ) { _, key, value -> carrier[key] = value }
                carrier
            }

        assertThat(expectedApplicationProperties).isEqualTo(otelInjectedProperties)
    }

    @Test
    fun `azure libraries should propagate the sent messages context to the received message`() {
        execInOtelContext { sendMessage(ServiceBusMessage("irrelevant")) }

        val actualParts = receiveSentMessageAndGetRelevantParts()

        assertThat(actualParts).usingRecursiveComparison().isEqualTo(
            RelevantServiceBusMessageParts(
                traceId = traceId.toString(),
                spanId = spanId.toString(),
                baggageAsMap = baggageAsMap,
                applicationProperties = expectedApplicationProperties,
            ),
        )
    }

    private fun sendMessage(message: ServiceBusMessage) =
        clientBuilder
            .sender()
            .queueName(queueName)
            .buildClient()
            .use { client -> client.sendMessage(message) }

    private fun receiveSentMessageAndGetRelevantParts(): RelevantServiceBusMessageParts {
        val blockingQueue = ArrayBlockingQueue<RelevantServiceBusMessageParts>(1)
        return clientBuilder
            .processor()
            .queueName(queueName)
            .receiveMode(RECEIVE_AND_DELETE)
            .processError { }
            .processMessage { message ->
                blockingQueue.put(
                    RelevantServiceBusMessageParts(
                        applicationProperties = message.message.applicationProperties,
                        baggageAsMap = Baggage.current().asMap().mapValues { it.value.value },
                        traceId = Span.current().spanContext.traceId,
                        spanId = Span.current().spanContext.spanId,
                    ),
                )
            }.buildProcessorClient()
            .use { client ->
                client.start()
                blockingQueue.take()
            }
    }

    private fun <T> execInOtelContext(block: () -> T) =
        Context
            .current()
            .with(Span.wrap(SpanContext.create(traceId, spanId, TraceFlags.getSampled(), TraceState.getDefault())))
            .with(baggageAsMap.entries.fold(Baggage.builder()) { acc, e -> acc.put(e.key, e.value) }.build())
            .makeCurrent()
            .use { block() }
}

build.gradle.kts

plugins {
    kotlin("jvm") version "2.1.10"
}

configurations {
    create("otel")
}

repositories {
    mavenCentral()
}

dependencies {
    // logging
    implementation("ch.qos.logback:logback-classic:1.5.16")

    // otel
    implementation(platform("io.opentelemetry:opentelemetry-bom:1.47.0"))
    implementation("io.opentelemetry:opentelemetry-api")
    "otel"("io.opentelemetry.javaagent:opentelemetry-javaagent:2.12.0")

    // service bus
    implementation(platform("com.azure:azure-sdk-bom:1.2.31"))
    implementation("com.azure:azure-messaging-servicebus")

    // testing
    testImplementation("org.assertj:assertj-core:3.27.3")
    testImplementation("org.junit.jupiter:junit-jupiter:5.11.4")
}

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(21)
    }
}

tasks.named<Test>("test") {
    jvmArgs(
        "-javaagent:build/otel/otel-agent.jar",
        "-Dotel.traces.exporter=none",
        "-Dotel.metrics.exporter=none",
        "-Dotel.logs.exporter=none",
    )

    useJUnitPlatform()
}

val otelCopy by tasks.register<Copy>("otelCopy") {
    from(configurations.getByName("otel"))
    into(layout.buildDirectory.dir("otel"))
    rename { "otel-agent.jar" }
}

tasks.compileKotlin {
    finalizedBy(otelCopy)
}

Expected behavior
I expected:

  1. The trace id to be received as sent - it was.
  2. The span id to be received as sent - it was NOT (e.g. sent 0000000ace0fba53 but received f23fb7bec62766d9).
  3. The baggage to be received as sent - it was NOT (e.g. sent "Bilbo" to "Baggins", "Samwise" to "Gamgee" but received nothing).

Here are some failed test output snippets to help understand quickly:

java.lang.AssertionError: 
field/property 'applicationProperties' differ:
- actual value  : {"Diagnostic-Id"="00-000000008badf00d0000deadbeefd00d-aefe3ac26d5cc435-01", "traceparent"="00-000000008badf00d0000deadbeefd00d-aefe3ac26d5cc435-01"}
- expected value: {"baggage"="Bilbo=Baggins,Samwise=Gamgee", "traceparent"="00-000000008badf00d0000deadbeefd00d-0000000ace0fba53-01"}
field/property 'baggageAsMap' differ:
- actual value  : {}
- expected value: {"Bilbo"="Baggins", "Samwise"="Gamgee"}
field/property 'spanId' differ:
- actual value  : "33ae66cec5ca31b5"
- expected value: "0000000ace0fba53"

Screenshots
Not applicable.

Setup

  • OS: macOS
  • IDE: IntelliJ
  • Library/Libraries: (see example build.gradle.kts above for more information) com.azure:azure-sdk-bom:1.2.31, com.azure:azure-messaging-servicebus:7.17.8, io.opentelemetry.javaagent:opentelemetry-javaagent:2.12.0, probably com.azure:azure-core-tracing-opentelemetry:1.0.0-beta.45 (based on this line)
  • Java version: 21
  • App Server/Environment: N/A
  • Frameworks: N/A

Additional context
Here's an example of how context propagation using all configured propagators might look (e.g. -Dotel.propagators=tracecontext,baggage,custom):

// injection
GlobalOpenTelemetry.getPropagators().textMapPropagator.inject(
    Context.current(),
    applicationProperties,
) { carrier, key, value -> carrier!![key] = value }

// extraction
val textMapPropagator = GlobalOpenTelemetry.getPropagators().textMapPropagator
return textMapPropagator.extract(
    Context.current(),
    applicationProperties,
    object : TextMapGetter<Map<String, Any>> {
        override fun keys(carrier: Map<String, Any>) = textMapPropagator.fields()

        override fun get(
            carrier: Map<String, Any>?,
            key: String,
        ) = carrier?.get(key)?.toString()
    },
)

🤔💡 Semi-random side note: it seems to me like other libraries KISS and leverage their existing interceptor support and "simply" inject an additional interceptor into their client (see OkHttp example). Wouldn't it be great if azure-messaging-servicebus support interceptors? 🤞. It would also greatly help users to implement cross-cutting functionality as in many other clients.

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

Metadata

Metadata

Assignees

Labels

Azure.Coreazure-corecustomer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK teamquestionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

Type

No type

Projects

Status

No status

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions