Skip to content

Commit 66cd4d2

Browse files
committed
Improve tests
1 parent ff1e240 commit 66cd4d2

File tree

4 files changed

+68
-46
lines changed

4 files changed

+68
-46
lines changed

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/RecordTypeProcessingHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?
4444
}
4545

4646
if (message != null && message.value() instanceof DeliveryBooked deliveryBooked) {
47-
return deliveryBooked.getNumberOfTires() == null
47+
return deliveryBooked.getNumberOfTires() == null || deliveryBooked.getNumberOfTires() < 0
4848
? ProcessingHandlerResponse.CONTINUE
4949
: ProcessingHandlerResponse.FAIL;
5050
}

dsl/src/test/java/com/michelin/kafka/error/handling/dsl/KafkaStreamsAppTest.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import static org.junit.jupiter.api.Assertions.assertEquals;
2727

2828
import com.michelin.kafka.error.handling.dsl.handler.ExceptionTypeProcessingHandler;
29+
import com.michelin.kafka.error.handling.dsl.handler.ProcessorIdProcessingHandler;
30+
import com.michelin.kafka.error.handling.dsl.handler.RecordTypeProcessingHandler;
2931
import java.util.List;
3032
import java.util.Properties;
33+
import java.util.stream.Stream;
3134
import org.apache.kafka.common.MetricName;
3235
import org.apache.kafka.common.serialization.StringDeserializer;
3336
import org.apache.kafka.common.serialization.StringSerializer;
@@ -37,39 +40,31 @@
3740
import org.apache.kafka.streams.TestOutputTopic;
3841
import org.apache.kafka.streams.TopologyTestDriver;
3942
import org.junit.jupiter.api.AfterEach;
40-
import org.junit.jupiter.api.BeforeEach;
41-
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.params.ParameterizedTest;
44+
import org.junit.jupiter.params.provider.MethodSource;
4245

4346
class KafkaStreamsAppTest {
4447
private TopologyTestDriver testDriver;
4548
private TestInputTopic<String, String> inputTopic;
4649
private TestOutputTopic<String, String> outputTopic;
4750

48-
@BeforeEach
49-
void setUp() {
50-
Properties properties = new Properties();
51-
properties.setProperty(APPLICATION_ID_CONFIG, "processing-error-handling-dsl-app-test");
52-
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
53-
properties.setProperty(
54-
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ExceptionTypeProcessingHandler.class.getName());
55-
56-
StreamsBuilder streamsBuilder = new StreamsBuilder();
57-
KafkaStreamsApp.buildTopology(streamsBuilder);
58-
testDriver = new TopologyTestDriver(streamsBuilder.build(), properties);
59-
60-
inputTopic =
61-
testDriver.createInputTopic("delivery_booked_topic", new StringSerializer(), new StringSerializer());
62-
outputTopic = testDriver.createOutputTopic(
63-
"filtered_delivery_booked_dsl_topic", new StringDeserializer(), new StringDeserializer());
64-
}
65-
6651
@AfterEach
6752
void tearDown() {
6853
testDriver.close();
6954
}
7055

71-
@Test
72-
void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
56+
static Stream<String> provideProcessingExceptionHandlerClassName() {
57+
return Stream.of(
58+
ExceptionTypeProcessingHandler.class.getName(),
59+
ProcessorIdProcessingHandler.class.getName(),
60+
RecordTypeProcessingHandler.class.getName());
61+
}
62+
63+
@ParameterizedTest
64+
@MethodSource("provideProcessingExceptionHandlerClassName")
65+
void shouldContinueOnInvalidDeliveryAndNullPointerExceptions(String processingExceptionHandlerClassName) {
66+
instantiateTopologyTestDriver(processingExceptionHandlerClassName);
67+
7368
inputTopic.pipeInput(
7469
"DEL12345",
7570
"""
@@ -114,6 +109,22 @@ void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
114109
testDriver.metrics().get(droppedRecordsRateMetric()).metricValue());
115110
}
116111

112+
void instantiateTopologyTestDriver(String processingExceptionHandlerClassName) {
113+
Properties properties = new Properties();
114+
properties.setProperty(APPLICATION_ID_CONFIG, "processing-error-handling-dsl-app-test");
115+
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
116+
properties.setProperty(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandlerClassName);
117+
118+
StreamsBuilder streamsBuilder = new StreamsBuilder();
119+
KafkaStreamsApp.buildTopology(streamsBuilder);
120+
testDriver = new TopologyTestDriver(streamsBuilder.build(), properties);
121+
122+
inputTopic =
123+
testDriver.createInputTopic("delivery_booked_topic", new StringSerializer(), new StringSerializer());
124+
outputTopic = testDriver.createOutputTopic(
125+
"filtered_delivery_booked_dsl_topic", new StringDeserializer(), new StringDeserializer());
126+
}
127+
117128
private MetricName droppedRecordsTotalMetric() {
118129
return createMetric("dropped-records-total", "The total number of dropped records");
119130
}

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/RecordTypeProcessingHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?
4444
}
4545

4646
if (message != null && message.value() instanceof DeliveryBooked deliveryBooked) {
47-
return deliveryBooked.getNumberOfTires() == null
47+
return deliveryBooked.getNumberOfTires() == null || deliveryBooked.getNumberOfTires() < 0
4848
? ProcessingHandlerResponse.CONTINUE
4949
: ProcessingHandlerResponse.FAIL;
5050
}

processor-api/src/test/java/com/michelin/kafka/error/handling/papi/KafkaStreamsAppTest.java

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import static org.junit.jupiter.api.Assertions.assertEquals;
2727

2828
import com.michelin.kafka.error.handling.papi.handler.ExceptionTypeProcessingHandler;
29+
import com.michelin.kafka.error.handling.papi.handler.ProcessorIdProcessingHandler;
30+
import com.michelin.kafka.error.handling.papi.handler.RecordTypeProcessingHandler;
2931
import java.time.Duration;
3032
import java.util.List;
3133
import java.util.Properties;
34+
import java.util.stream.Stream;
3235
import org.apache.kafka.common.MetricName;
3336
import org.apache.kafka.common.serialization.StringDeserializer;
3437
import org.apache.kafka.common.serialization.StringSerializer;
@@ -38,39 +41,31 @@
3841
import org.apache.kafka.streams.TestOutputTopic;
3942
import org.apache.kafka.streams.TopologyTestDriver;
4043
import org.junit.jupiter.api.AfterEach;
41-
import org.junit.jupiter.api.BeforeEach;
42-
import org.junit.jupiter.api.Test;
44+
import org.junit.jupiter.params.ParameterizedTest;
45+
import org.junit.jupiter.params.provider.MethodSource;
4346

4447
class KafkaStreamsAppTest {
4548
private TopologyTestDriver testDriver;
4649
private TestInputTopic<String, String> inputTopic;
4750
private TestOutputTopic<String, String> outputTopic;
4851

49-
@BeforeEach
50-
void setUp() {
51-
Properties properties = new Properties();
52-
properties.setProperty(APPLICATION_ID_CONFIG, "processing-error-handling-papi-app");
53-
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
54-
properties.setProperty(
55-
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ExceptionTypeProcessingHandler.class.getName());
56-
57-
StreamsBuilder streamsBuilder = new StreamsBuilder();
58-
KafkaStreamsApp.buildTopology(streamsBuilder);
59-
testDriver = new TopologyTestDriver(streamsBuilder.build(), properties);
60-
61-
inputTopic =
62-
testDriver.createInputTopic("delivery_booked_topic", new StringSerializer(), new StringSerializer());
63-
outputTopic = testDriver.createOutputTopic(
64-
"filtered_delivery_booked_papi_topic", new StringDeserializer(), new StringDeserializer());
65-
}
66-
6752
@AfterEach
6853
void tearDown() {
6954
testDriver.close();
7055
}
7156

72-
@Test
73-
void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
57+
static Stream<String> provideProcessingExceptionHandlerClassName() {
58+
return Stream.of(
59+
ExceptionTypeProcessingHandler.class.getName(),
60+
ProcessorIdProcessingHandler.class.getName(),
61+
RecordTypeProcessingHandler.class.getName());
62+
}
63+
64+
@ParameterizedTest
65+
@MethodSource("provideProcessingExceptionHandlerClassName")
66+
void shouldContinueOnInvalidDeliveryAndNullPointerExceptions(String processingExceptionHandlerClassName) {
67+
instantiateTopologyTestDriver(processingExceptionHandlerClassName);
68+
7469
inputTopic.pipeInput(
7570
"DEL12345",
7671
"""
@@ -117,6 +112,22 @@ void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
117112
testDriver.metrics().get(droppedRecordsRateMetric()).metricValue());
118113
}
119114

115+
void instantiateTopologyTestDriver(String processingExceptionHandlerClassName) {
116+
Properties properties = new Properties();
117+
properties.setProperty(APPLICATION_ID_CONFIG, "processing-error-handling-papi-app");
118+
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
119+
properties.setProperty(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandlerClassName);
120+
121+
StreamsBuilder streamsBuilder = new StreamsBuilder();
122+
KafkaStreamsApp.buildTopology(streamsBuilder);
123+
testDriver = new TopologyTestDriver(streamsBuilder.build(), properties);
124+
125+
inputTopic =
126+
testDriver.createInputTopic("delivery_booked_topic", new StringSerializer(), new StringSerializer());
127+
outputTopic = testDriver.createOutputTopic(
128+
"filtered_delivery_booked_papi_topic", new StringDeserializer(), new StringDeserializer());
129+
}
130+
120131
private MetricName droppedRecordsTotalMetric() {
121132
return createMetric("dropped-records-total", "The total number of dropped records");
122133
}

0 commit comments

Comments
 (0)