Skip to content

Commit e14e6c2

Browse files
committed
Return ConsumerStoppedEvent with Abnormal reason when MLC stopped abnormally
Signed-off-by: Alamuri Lokesh <[email protected]>
1 parent 045ea9a commit e14e6c2

File tree

4 files changed

+32
-2
lines changed

4 files changed

+32
-2
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/events.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ In addition, the `ConsumerStoppedEvent` has the following additional property:
102102

103103
* `reason`:
104104
** `NORMAL` - the consumer stopped normally (container was stopped).
105+
** `ABNORMAL` - the consumer stopped abnormally (container was stopped abnormally).
105106
** `ERROR` - a `java.lang.Error` was thrown.
106107
** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`.
107108
** `AUTH` - an `AuthenticationException` or `AuthorizationException` was thrown and the `authExceptionRetryInterval` is not configured.

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
* to restart a container that was stopped because a transactional producer was fenced.
2525
*
2626
* @author Gary Russell
27+
* @author Lokesh Alamuri
2728
* @since 2.2
2829
*
2930
*/
@@ -43,6 +44,13 @@ public enum Reason {
4344
*/
4445
NORMAL,
4546

47+
/**
48+
* The consumer was stopped because the container was stopped abnormally.
49+
* @since 4.0
50+
*
51+
*/
52+
ABNORMAL,
53+
4654
/**
4755
* The transactional producer was fenced and the container
4856
* {@code stopContainerWhenFenced} property is true.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,9 @@ else if (throwable instanceof AuthenticationException || throwable instanceof Au
537537
else if (throwable instanceof NoOffsetForPartitionException) {
538538
reason = Reason.NO_OFFSET;
539539
}
540+
else if (!this.isStoppedNormally()) {
541+
reason = Reason.ABNORMAL;
542+
}
540543
else {
541544
reason = Reason.NORMAL;
542545
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1088,11 +1088,29 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
10881088
assertThat(container.getContainers()).
10891089
doesNotContain(childContainer1);
10901090

1091-
container.getContainers().forEach(containerForEach -> containerForEach.stop());
1091+
KafkaMessageListenerContainer<Integer, String> childContainer0SecRun = container.getContainers().get(0);
1092+
KafkaMessageListenerContainer<Integer, String> childContainer1SecRun = container.getContainers().get(1);
1093+
1094+
childContainer0SecRun.stopAbnormally(() -> {
1095+
});
1096+
1097+
childContainer1SecRun.stop();
1098+
10921099
assertThat(container.getContainers()).isNotEmpty();
10931100
container.stop();
10941101
assertThat(concurrentContainerSecondStopLatch.await(30, TimeUnit.SECONDS)).isTrue();
10951102

1103+
events.stream().forEach(event -> {
1104+
if (event.getContainer(MessageListenerContainer.class).equals(childContainer0SecRun)
1105+
&& event instanceof ConsumerStoppedEvent) {
1106+
assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.ABNORMAL);
1107+
}
1108+
else if (event.getContainer(MessageListenerContainer.class).equals(childContainer1SecRun)
1109+
&& event instanceof ConsumerStoppedEvent) {
1110+
assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.NORMAL);
1111+
}
1112+
});
1113+
10961114
this.logger.info("Stop containerStartStop");
10971115
}
10981116

0 commit comments

Comments
 (0)