Skip to content

Commit 97fd971

Browse files
committed
Update code sample with multiple processing exception handler implementations
1 parent 93688eb commit 97fd971

File tree

13 files changed

+268
-120
lines changed

13 files changed

+268
-120
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.michelin.kafka.error.handling.dsl;
2+
3+
public class InvalidDeliveryException extends RuntimeException {
4+
5+
public InvalidDeliveryException(String message) {
6+
super(message);
7+
}
8+
}

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/KafkaStreamsApp.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
2424

2525
import com.google.gson.Gson;
26+
import com.michelin.kafka.error.handling.dsl.handler.ExceptionTypeProcessingHandler;
2627
import java.util.Optional;
2728
import java.util.Properties;
2829
import org.apache.kafka.common.serialization.Serdes;
2930
import org.apache.kafka.streams.KafkaStreams;
3031
import org.apache.kafka.streams.StreamsBuilder;
32+
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
3133
import org.apache.kafka.streams.kstream.Consumed;
3234
import org.apache.kafka.streams.kstream.Produced;
3335

@@ -40,13 +42,13 @@ public static void main(String[] args) {
4042
properties.put(
4143
BOOTSTRAP_SERVERS_CONFIG,
4244
Optional.ofNullable(System.getenv("BOOTSTRAP_SERVERS")).orElse("localhost:9092"));
43-
properties.put(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProcessingExceptionHandler.class.getName());
45+
properties.put(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ExceptionTypeProcessingHandler.class.getName());
4446

4547
StreamsBuilder streamsBuilder = new StreamsBuilder();
4648
buildTopology(streamsBuilder);
4749

4850
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
49-
51+
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
5052
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
5153

5254
kafkaStreams.start();
@@ -55,7 +57,13 @@ public static void main(String[] args) {
5557
public static void buildTopology(StreamsBuilder streamsBuilder) {
5658
streamsBuilder.stream("delivery_booked_topic", Consumed.with(Serdes.String(), Serdes.String()))
5759
.mapValues(KafkaStreamsApp::parseFromJson) // JsonSyntaxException
58-
.filter((key, value) -> value.getNumberOfTires() >= 10) // NullPointerException
60+
.filter((key, value) -> {
61+
if (value.getNumberOfTires() < 0) {
62+
throw new InvalidDeliveryException("Number of tires cannot be negative");
63+
}
64+
65+
return value.getNumberOfTires() >= 10;
66+
}) // InvalidDeliveryException or NullPointerException
5967
.mapValues(KafkaStreamsApp::parseToJson)
6068
.to("filtered_delivery_booked_dsl_topic", Produced.with(Serdes.String(), Serdes.String()));
6169
}

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/CustomProcessingExceptionHandler.java renamed to dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/ExceptionTypeProcessingHandler.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package com.michelin.kafka.error.handling.dsl;
19+
package com.michelin.kafka.error.handling.dsl.handler;
2020

2121
import com.google.gson.JsonSyntaxException;
22+
import com.michelin.kafka.error.handling.dsl.InvalidDeliveryException;
2223
import java.util.Map;
24+
import org.apache.kafka.common.errors.NetworkException;
2325
import org.apache.kafka.streams.errors.ErrorHandlerContext;
2426
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
2527
import org.apache.kafka.streams.processor.api.Record;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

29-
public class CustomProcessingExceptionHandler implements ProcessingExceptionHandler {
30-
private static final Logger log = LoggerFactory.getLogger(CustomProcessingExceptionHandler.class);
31+
public class ExceptionTypeProcessingHandler implements ProcessingExceptionHandler {
32+
private static final Logger log = LoggerFactory.getLogger(ExceptionTypeProcessingHandler.class);
3133

3234
@Override
3335
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
@@ -41,15 +43,18 @@ record != null ? record.key() : null,
4143
record != null ? record.value() : null,
4244
exception);
4345

44-
return isContinuableException(exception) ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
46+
if (exception instanceof JsonSyntaxException)
47+
return ProcessingHandlerResponse.FAIL;
48+
if (exception instanceof InvalidDeliveryException)
49+
return ProcessingHandlerResponse.CONTINUE;
50+
if (exception instanceof NetworkException)
51+
return ProcessingHandlerResponse.FAIL;
52+
53+
return ProcessingHandlerResponse.CONTINUE;
4554
}
4655

4756
@Override
4857
public void configure(Map<String, ?> map) {
4958
// Do nothing
5059
}
51-
52-
private boolean isContinuableException(Exception exception) {
53-
return exception instanceof JsonSyntaxException || exception instanceof NullPointerException;
54-
}
5560
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.michelin.kafka.error.handling.dsl.handler;
2+
3+
import java.util.Map;
4+
import org.apache.kafka.streams.errors.ErrorHandlerContext;
5+
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
6+
import org.apache.kafka.streams.processor.api.Record;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
public class ProcessorIdProcessingHandler implements ProcessingExceptionHandler {
11+
private static final Logger log = LoggerFactory.getLogger(ProcessorIdProcessingHandler.class);
12+
13+
@Override
14+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
15+
log.warn(
16+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
17+
context.processorNodeId(),
18+
context.topic(),
19+
context.partition(),
20+
context.offset(),
21+
record != null ? record.key() : null,
22+
record != null ? record.value() : null,
23+
exception);
24+
25+
if (context.processorNodeId().equals("MAP_PROCESSOR"))
26+
return ProcessingHandlerResponse.FAIL;
27+
if (context.processorNodeId().equals("FILTER_PROCESSOR"))
28+
return ProcessingHandlerResponse.CONTINUE;
29+
if (context.processorNodeId().equals("SELECT_KEY_PROCESSOR"))
30+
return ProcessingHandlerResponse.FAIL;
31+
32+
return ProcessingHandlerResponse.CONTINUE;
33+
34+
}
35+
36+
@Override
37+
public void configure(Map<String, ?> map) {
38+
39+
}
40+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.michelin.kafka.error.handling.dsl.handler;
2+
3+
import com.michelin.kafka.error.handling.dsl.DeliveryBooked;
4+
import java.util.Map;
5+
import org.apache.kafka.streams.errors.ErrorHandlerContext;
6+
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
7+
import org.apache.kafka.streams.processor.api.Record;
8+
9+
public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
10+
@Override
11+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
12+
if (record.value() instanceof DeliveryBooked deliveryBooked) {
13+
return deliveryBooked.getNumberOfTires() == null ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
14+
}
15+
16+
return ProcessingHandlerResponse.CONTINUE;
17+
18+
}
19+
20+
@Override
21+
public void configure(Map<String, ?> map) {
22+
23+
}
24+
}

dsl/src/test/java/com/michelin/kafka/error/handling/dsl/KafkaStreamsAppTest.java

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
2626
import static org.junit.jupiter.api.Assertions.assertEquals;
2727

28+
import com.michelin.kafka.error.handling.dsl.handler.ExceptionTypeProcessingHandler;
2829
import java.util.List;
2930
import java.util.Properties;
3031
import org.apache.kafka.common.MetricName;
@@ -45,12 +46,12 @@ class KafkaStreamsAppTest {
4546
private TestOutputTopic<String, String> outputTopic;
4647

4748
@BeforeEach
48-
public void setUp() {
49+
void setUp() {
4950
Properties properties = new Properties();
5051
properties.setProperty(APPLICATION_ID_CONFIG, "processing-error-handling-dsl-app-test");
5152
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
5253
properties.setProperty(
53-
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProcessingExceptionHandler.class.getName());
54+
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ExceptionTypeProcessingHandler.class.getName());
5455

5556
StreamsBuilder streamsBuilder = new StreamsBuilder();
5657
KafkaStreamsApp.buildTopology(streamsBuilder);
@@ -68,60 +69,50 @@ void tearDown() {
6869
}
6970

7071
@Test
71-
void shouldHandleExceptionsAndContinueProcessing() {
72+
void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
7273
inputTopic.pipeInput(
73-
"DEL12345",
74-
"""
75-
{
76-
"deliveryId": "DEL12345",
77-
"truckId": "TRK56789",
78-
"numberOfTires": 18,
79-
"destination": "Bordeaux"
80-
}
81-
""");
74+
"DEL12345",
75+
"""
76+
{
77+
"deliveryId": "DEL12345",
78+
"truckId": "TRK56789",
79+
"numberOfTires": 18,
80+
"destination": "Bordeaux"
81+
}
82+
""");
83+
84+
// "numberOfTires" is negative. This will throw an InvalidDeliveryException
85+
inputTopic.pipeInput(
86+
"DEL73148",
87+
"""
88+
{
89+
"deliveryId": "DEL67145",
90+
"truckId": "TRK34567",
91+
"numberOfTires": -1,
92+
"destination": "Marseille"
93+
}
94+
""");
8295

8396
// "numberOfTires" is missing. This will throw a NullPointerException
8497
inputTopic.pipeInput(
85-
"DEL73148",
86-
"""
87-
{
88-
"deliveryId": "DEL73148",
89-
"truckId": "TRK48612",
90-
"destination": "Lyon"
91-
}
92-
""");
98+
"DEL73148",
99+
"""
100+
{
101+
"deliveryId": "DEL73148",
102+
"truckId": "TRK48612",
103+
"destination": "Lyon"
104+
}
105+
""");
93106

94-
inputTopic.pipeInput(
95-
"DEL67891",
96-
"""
97-
{
98-
"deliveryId": "DEL67891",
99-
"truckId": "TRK12345",
100-
"numberOfTires": 7,
101-
"destination": "Paris"
102-
}
103-
""");
104-
105-
// Json syntax error. This will throw a JsonSyntaxException
106-
inputTopic.pipeInput(
107-
"DEL67891",
108-
"""
109-
{
110-
"deliveryId": ,
111-
"truckId": "TRK12345",
112-
"numberOfTires": 7,
113-
"destination": "Paris"
114-
}
115-
""");
116107

117108
List<KeyValue<String, String>> results = outputTopic.readKeyValuesToList();
118109

119110
assertEquals("DEL12345", results.getFirst().key);
120111

121112
assertEquals(2.0, testDriver.metrics().get(droppedRecordsTotalMetric()).metricValue());
122113
assertEquals(
123-
0.06666666666666667,
124-
testDriver.metrics().get(droppedRecordsRateMetric()).metricValue());
114+
0.06666666666666667,
115+
testDriver.metrics().get(droppedRecordsRateMetric()).metricValue());
125116
}
126117

127118
private MetricName droppedRecordsTotalMetric() {

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/CustomProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ public void init(ProcessorContext<String, String> context) {
6060
public void process(Record<String, String> record) {
6161
DeliveryBooked value = parseFromJson(record.value());
6262

63+
if (value.getNumberOfTires() < 0) {
64+
throw new InvalidDeliveryException("Number of tires cannot be negative");
65+
}
66+
6367
if (value.getNumberOfTires() >= 10) {
6468
context().forward(record.withValue(parseToJson(value)));
6569
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.michelin.kafka.error.handling.papi;
2+
3+
public class InvalidDeliveryException extends RuntimeException {
4+
5+
public InvalidDeliveryException(String message) {
6+
super(message);
7+
}
8+
}

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/KafkaStreamsApp.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
2323
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
2424

25+
import com.michelin.kafka.error.handling.papi.handler.ExceptionTypeProcessingHandler;
2526
import java.util.Optional;
2627
import java.util.Properties;
2728
import org.apache.kafka.common.serialization.Serdes;
2829
import org.apache.kafka.streams.KafkaStreams;
2930
import org.apache.kafka.streams.StreamsBuilder;
31+
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
3032
import org.apache.kafka.streams.kstream.Consumed;
3133
import org.apache.kafka.streams.kstream.Produced;
3234

@@ -38,13 +40,13 @@ public static void main(String[] args) {
3840
properties.put(
3941
BOOTSTRAP_SERVERS_CONFIG,
4042
Optional.ofNullable(System.getenv("BOOTSTRAP_SERVERS")).orElse("localhost:9092"));
41-
properties.put(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProcessingExceptionHandler.class.getName());
43+
properties.put(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ExceptionTypeProcessingHandler.class.getName());
4244

4345
StreamsBuilder streamsBuilder = new StreamsBuilder();
4446
buildTopology(streamsBuilder);
4547

4648
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
47-
49+
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
4850
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
4951

5052
kafkaStreams.start();

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/CustomProcessingExceptionHandler.java renamed to processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/ExceptionTypeProcessingHandler.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,42 +16,46 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package com.michelin.kafka.error.handling.papi;
19+
package com.michelin.kafka.error.handling.papi.handler;
2020

2121
import com.google.gson.JsonSyntaxException;
22+
import com.michelin.kafka.error.handling.papi.InvalidDeliveryException;
23+
import com.michelin.kafka.error.handling.papi.KaboomException;
2224
import java.util.Map;
25+
import org.apache.kafka.common.errors.NetworkException;
2326
import org.apache.kafka.streams.errors.ErrorHandlerContext;
2427
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
2528
import org.apache.kafka.streams.processor.api.Record;
2629
import org.slf4j.Logger;
2730
import org.slf4j.LoggerFactory;
2831

29-
public class CustomProcessingExceptionHandler implements ProcessingExceptionHandler {
30-
private static final Logger log = LoggerFactory.getLogger(CustomProcessingExceptionHandler.class);
32+
public class ExceptionTypeProcessingHandler implements ProcessingExceptionHandler {
33+
private static final Logger log = LoggerFactory.getLogger(ExceptionTypeProcessingHandler.class);
3134

3235
@Override
3336
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
3437
log.warn(
35-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
36-
context.processorNodeId(),
37-
context.topic(),
38-
context.partition(),
39-
context.offset(),
40-
record != null ? record.key() : null,
41-
record != null ? record.value() : null,
42-
exception);
38+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
39+
context.processorNodeId(),
40+
context.topic(),
41+
context.partition(),
42+
context.offset(),
43+
record != null ? record.key() : null,
44+
record != null ? record.value() : null,
45+
exception);
4346

44-
return isContinuableException(exception) ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
47+
if (exception instanceof JsonSyntaxException)
48+
return ProcessingHandlerResponse.FAIL;
49+
if (exception instanceof InvalidDeliveryException)
50+
return ProcessingHandlerResponse.CONTINUE;
51+
if (exception instanceof NetworkException)
52+
return ProcessingHandlerResponse.FAIL;
53+
54+
return ProcessingHandlerResponse.CONTINUE;
4555
}
4656

4757
@Override
4858
public void configure(Map<String, ?> map) {
4959
// Do nothing
5060
}
51-
52-
private boolean isContinuableException(Exception exception) {
53-
return exception instanceof JsonSyntaxException
54-
|| exception instanceof NullPointerException
55-
|| exception instanceof KaboomException;
56-
}
5761
}

0 commit comments

Comments
 (0)