Skip to content

Commit 318cfb4

Browse files
authored
GH-3950: Propagate scope in async failures
Fixes: #3950 Fix trace context loss in async Kafka error handling Problem When async returns are enabled and a consumer failure occurs, the trace context from the original message is not propagated. This leads to each step of the retry/DLT flow starting a new trace instead of continuing the original one. Example (current behavior): • Producer → trace 1 • Consumer → trace 1, fails → message goes to retry topic • Retry listener → trace 2, fails → message goes to DLT topic • DLT listener → trace 3 This breaks end-to-end traceability, as each listener receives a new trace ID. Root cause The issue stems from the `handleAsyncFailure` method, which runs in a different thread but does not propagate the original Observation (trace) context associated with the failed record. Fix Ensure that the observation context is correctly propagated when handling async failures. This preserves the trace ID across retry and DLT flows. **Auto-cherry-pick to `3.3.x`** Signed-off-by: Igor Macedo Quintanilha <[email protected]> Co-authored-with: Artem Bilan <[email protected]>
1 parent 6425682 commit 318cfb4

File tree

3 files changed

+155
-15
lines changed

3 files changed

+155
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,7 +1498,7 @@ protected void handleAsyncFailure() {
14981498
// We will give up on retrying with the remaining copied and failed Records.
14991499
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
15001500
try {
1501-
invokeErrorHandlerBySingleRecord(copyFailedRecord);
1501+
copyFailedRecord.observation.scoped(() -> invokeErrorHandlerBySingleRecord(copyFailedRecord));
15021502
}
15031503
catch (Exception e) {
15041504
this.logger.warn(() ->
@@ -3432,8 +3432,13 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords
34323432
.values();
34333433
}
34343434

3435+
private Observation getCurrentObservation() {
3436+
Observation currentObservation = this.observationRegistry.getCurrentObservation();
3437+
return currentObservation == null ? Observation.NOOP : currentObservation;
3438+
}
3439+
34353440
private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) {
3436-
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
3441+
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex, getCurrentObservation()));
34373442
}
34383443

34393444
@Override
@@ -4050,6 +4055,6 @@ private static class StopAfterFenceException extends KafkaException {
40504055

40514056
}
40524057

4053-
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { }
4058+
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex, Observation observation) { }
40544059

40554060
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ else if (!(result instanceof CompletableFuture<?>)) {
549549
}
550550

551551
completableFutureResult.whenComplete((r, t) -> {
552-
try {
552+
try (var scope = observation.openScope()) {
553553
if (t == null) {
554554
asyncSuccess(r, replyTopic, source, messageReturnType);
555555
if (isAsyncReplies()) {
@@ -736,13 +736,15 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
736736
"Async Fail", Objects.requireNonNull(source).getPayload()), cause));
737737
}
738738
catch (Throwable ex) {
739-
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
740739
acknowledge(acknowledgment);
741740
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) {
742741
@SuppressWarnings("unchecked")
743742
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
744743
this.asyncRetryCallback.accept(record, (RuntimeException) ex);
745744
}
745+
else {
746+
this.logger.error(ex, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
747+
}
746748
}
747749
}
748750

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 143 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@
3838
import io.micrometer.observation.Observation;
3939
import io.micrometer.observation.ObservationHandler;
4040
import io.micrometer.observation.ObservationRegistry;
41-
import io.micrometer.observation.tck.TestObservationRegistry;
4241
import io.micrometer.tracing.Span;
4342
import io.micrometer.tracing.TraceContext;
4443
import io.micrometer.tracing.Tracer;
4544
import io.micrometer.tracing.handler.DefaultTracingObservationHandler;
4645
import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler;
4746
import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler;
47+
import io.micrometer.tracing.handler.TracingAwareMeterObservationHandler;
4848
import io.micrometer.tracing.propagation.Propagator;
4949
import io.micrometer.tracing.test.simple.SimpleSpan;
50+
import io.micrometer.tracing.test.simple.SimpleTraceContext;
5051
import io.micrometer.tracing.test.simple.SimpleTracer;
5152
import org.apache.kafka.clients.admin.AdminClientConfig;
5253
import org.apache.kafka.clients.consumer.Consumer;
@@ -70,8 +71,10 @@
7071
import org.springframework.context.annotation.Configuration;
7172
import org.springframework.context.annotation.Primary;
7273
import org.springframework.kafka.KafkaException;
74+
import org.springframework.kafka.annotation.DltHandler;
7375
import org.springframework.kafka.annotation.EnableKafka;
7476
import org.springframework.kafka.annotation.KafkaListener;
77+
import org.springframework.kafka.annotation.RetryableTopic;
7578
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
7679
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
7780
import org.springframework.kafka.core.ConsumerFactory;
@@ -80,6 +83,7 @@
8083
import org.springframework.kafka.core.KafkaAdmin;
8184
import org.springframework.kafka.core.KafkaTemplate;
8285
import org.springframework.kafka.core.ProducerFactory;
86+
import org.springframework.kafka.listener.ContainerProperties;
8387
import org.springframework.kafka.listener.MessageListenerContainer;
8488
import org.springframework.kafka.listener.RecordInterceptor;
8589
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@@ -90,6 +94,9 @@
9094
import org.springframework.kafka.test.context.EmbeddedKafka;
9195
import org.springframework.kafka.test.utils.KafkaTestUtils;
9296
import org.springframework.messaging.handler.annotation.SendTo;
97+
import org.springframework.retry.annotation.Backoff;
98+
import org.springframework.scheduling.TaskScheduler;
99+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
93100
import org.springframework.test.annotation.DirtiesContext;
94101
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
95102
import org.springframework.util.StringUtils;
@@ -113,7 +120,8 @@
113120
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
114121
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
115122
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
116-
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
123+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE, ObservationTests.OBSERVATION_ASYNC_FAILURE_TEST,
124+
ObservationTests.OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST}, partitions = 1)
117125
@DirtiesContext
118126
public class ObservationTests {
119127

@@ -137,6 +145,55 @@ public class ObservationTests {
137145

138146
public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";
139147

148+
public final static String OBSERVATION_ASYNC_FAILURE_TEST = "observation.async.failure.test";
149+
150+
public final static String OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST = "observation.async.failure.retry.test";
151+
152+
@Test
153+
void asyncRetryScopePropagation(@Autowired AsyncFailureListener asyncFailureListener,
154+
@Autowired KafkaTemplate<Integer, String> template,
155+
@Autowired SimpleTracer tracer,
156+
@Autowired ObservationRegistry observationRegistry) throws InterruptedException {
157+
158+
// Clear any previous spans
159+
tracer.getSpans().clear();
160+
161+
// Create an observation scope to ensure we have a proper trace context
162+
var testObservation = Observation.createNotStarted("test.message.send", observationRegistry);
163+
164+
// Send a message within the observation scope to ensure trace context is propagated
165+
testObservation.observe(() -> {
166+
try {
167+
template.send(OBSERVATION_ASYNC_FAILURE_TEST, "trigger-async-failure").get(5, TimeUnit.SECONDS);
168+
}
169+
catch (Exception e) {
170+
throw new RuntimeException("Failed to send message", e);
171+
}
172+
});
173+
174+
// Wait for the listener to process the message (initial + retry + DLT = 3 invocations)
175+
assertThat(asyncFailureListener.asyncFailureLatch.await(100000, TimeUnit.SECONDS)).isTrue();
176+
177+
// Verify that the captured spans from the listener contexts are all part of the same trace
178+
// This demonstrates that the tracing context propagates correctly through the retry mechanism
179+
Deque<SimpleSpan> spans = tracer.getSpans();
180+
assertThat(spans).hasSizeGreaterThanOrEqualTo(4); // template + listener + retry + DLT spans
181+
182+
// Verify that spans were captured for each phase and belong to the same trace
183+
assertThat(asyncFailureListener.capturedSpanInListener).isNotNull();
184+
assertThat(asyncFailureListener.capturedSpanInRetry).isNotNull();
185+
assertThat(asyncFailureListener.capturedSpanInDlt).isNotNull();
186+
187+
// All spans should have the same trace ID, demonstrating trace continuity
188+
var originalTraceId = asyncFailureListener.capturedSpanInListener.getTraceId();
189+
assertThat(originalTraceId).isNotBlank();
190+
assertThat(asyncFailureListener.capturedSpanInRetry.getTraceId()).isEqualTo(originalTraceId);
191+
assertThat(asyncFailureListener.capturedSpanInDlt.getTraceId()).isEqualTo(originalTraceId);
192+
193+
// Clear any previous spans
194+
tracer.getSpans().clear();
195+
}
196+
140197
@Test
141198
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
142199
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -628,6 +685,11 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
628685
if (container.getListenerId().equals("obs3")) {
629686
container.setKafkaAdmin(this.mockAdmin);
630687
}
688+
if (container.getListenerId().contains("asyncFailure")) {
689+
// Enable async acks to trigger async failure handling
690+
container.getContainerProperties().setAsyncAcks(true);
691+
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
692+
}
631693
if (container.getListenerId().equals("obs4")) {
632694
container.setRecordInterceptor(new RecordInterceptor<>() {
633695

@@ -662,17 +724,17 @@ MeterRegistry meterRegistry() {
662724

663725
@Bean
664726
ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, MeterRegistry meterRegistry) {
665-
TestObservationRegistry observationRegistry = TestObservationRegistry.create();
727+
var observationRegistry = ObservationRegistry.create();
666728
observationRegistry.observationConfig().observationHandler(
667729
// Composite will pick the first matching handler
668730
new ObservationHandler.FirstMatchingCompositeObservationHandler(
669-
// This is responsible for creating a child span on the sender side
670-
new PropagatingSenderTracingObservationHandler<>(tracer, propagator),
671731
// This is responsible for creating a span on the receiver side
672732
new PropagatingReceiverTracingObservationHandler<>(tracer, propagator),
733+
// This is responsible for creating a child span on the sender side
734+
new PropagatingSenderTracingObservationHandler<>(tracer, propagator),
673735
// This is responsible for creating a default span
674736
new DefaultTracingObservationHandler(tracer)))
675-
.observationHandler(new DefaultMeterObservationHandler(meterRegistry));
737+
.observationHandler(new TracingAwareMeterObservationHandler<>(new DefaultMeterObservationHandler(meterRegistry), tracer));
676738
return observationRegistry;
677739
}
678740

@@ -683,29 +745,41 @@ Propagator propagator(Tracer tracer) {
683745
// List of headers required for tracing propagation
684746
@Override
685747
public List<String> fields() {
686-
return Arrays.asList("foo", "bar");
748+
return Arrays.asList("traceId", "spanId", "foo", "bar");
687749
}
688750

689751
// This is called on the producer side when the message is being sent
690-
// Normally we would pass information from tracing context - for tests we don't need to
691752
@Override
692753
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
693754
setter.set(carrier, "foo", "some foo value");
694755
setter.set(carrier, "bar", "some bar value");
695756

757+
setter.set(carrier, "traceId", context.traceId());
758+
setter.set(carrier, "spanId", context.spanId());
759+
696760
// Add a traceparent header to simulate W3C trace context
697761
setter.set(carrier, "traceparent", "traceparent-from-propagator");
698762
}
699763

700764
// This is called on the consumer side when the message is consumed
701-
// Normally we would use tools like Extractor from tracing but for tests we are just manually creating a span
702765
@Override
703766
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
704767
String foo = getter.get(carrier, "foo");
705768
String bar = getter.get(carrier, "bar");
706-
return tracer.spanBuilder()
769+
770+
var traceId = getter.get(carrier, "traceId");
771+
var spanId = getter.get(carrier, "spanId");
772+
773+
Span.Builder spanBuilder = tracer.spanBuilder()
707774
.tag("foo", foo)
708775
.tag("bar", bar);
776+
777+
var traceContext = new SimpleTraceContext();
778+
traceContext.setTraceId(traceId);
779+
traceContext.setSpanId(spanId);
780+
spanBuilder = spanBuilder.setParent(traceContext);
781+
782+
return spanBuilder;
709783
}
710784
};
711785
}
@@ -720,6 +794,15 @@ ExceptionListener exceptionListener() {
720794
return new ExceptionListener();
721795
}
722796

797+
@Bean
798+
AsyncFailureListener asyncFailureListener(SimpleTracer tracer) {
799+
return new AsyncFailureListener(tracer);
800+
}
801+
802+
@Bean
803+
public TaskScheduler taskExecutor() {
804+
return new ThreadPoolTaskScheduler();
805+
}
723806
}
724807

725808
public static class Listener {
@@ -801,4 +884,54 @@ Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
801884

802885
}
803886

887+
public static class AsyncFailureListener {
888+
889+
final CountDownLatch asyncFailureLatch = new CountDownLatch(3);
890+
891+
volatile @Nullable SimpleSpan capturedSpanInListener;
892+
893+
volatile @Nullable SimpleSpan capturedSpanInRetry;
894+
895+
volatile @Nullable SimpleSpan capturedSpanInDlt;
896+
897+
private final SimpleTracer tracer;
898+
899+
public AsyncFailureListener(SimpleTracer tracer) {
900+
this.tracer = tracer;
901+
}
902+
903+
@RetryableTopic(
904+
attempts = "2",
905+
backoff = @Backoff(delay = 1000)
906+
)
907+
@KafkaListener(id = "asyncFailure", topics = OBSERVATION_ASYNC_FAILURE_TEST)
908+
CompletableFuture<Void> handleAsync(ConsumerRecord<Integer, String> record) {
909+
910+
// Use topic name to distinguish between original and retry calls
911+
String topicName = record.topic();
912+
913+
if (topicName.equals(OBSERVATION_ASYNC_FAILURE_TEST)) {
914+
// This is the original call
915+
this.capturedSpanInListener = this.tracer.currentSpan();
916+
}
917+
else {
918+
// This is a retry call (topic name will be different for retry topics)
919+
this.capturedSpanInRetry = this.tracer.currentSpan();
920+
}
921+
922+
this.asyncFailureLatch.countDown();
923+
924+
// Return a failed CompletableFuture to trigger async failure handling
925+
return CompletableFuture.supplyAsync(() -> {
926+
throw new RuntimeException("Async failure for observation test");
927+
});
928+
}
929+
930+
@DltHandler
931+
void handleDlt(ConsumerRecord<Integer, String> record, Exception exception) {
932+
this.capturedSpanInDlt = this.tracer.currentSpan();
933+
this.asyncFailureLatch.countDown();
934+
}
935+
}
936+
804937
}

0 commit comments

Comments
 (0)