Skip to content

Commit 4937bee

Browse files
committed
Improve record type processing handler
1 parent 66cd4d2 commit 4937bee

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/CustomProcessor.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import org.apache.kafka.streams.processor.api.ProcessorContext;
4545
import org.apache.kafka.streams.processor.api.Record;
4646

47-
public class CustomProcessor extends ContextualProcessor<String, String, String, String> {
47+
public class CustomProcessor extends ContextualProcessor<String, DeliveryBooked, String, String> {
4848
private static final Gson gson = new Gson();
4949

5050
@Override
@@ -57,22 +57,16 @@ public void init(ProcessorContext<String, String> context) {
5757
}
5858

5959
@Override
60-
public void process(Record<String, String> message) {
61-
DeliveryBooked value = parseFromJson(message.value());
62-
63-
if (value.getNumberOfTires() < 0) {
60+
public void process(Record<String, DeliveryBooked> message) {
61+
if (message.value().getNumberOfTires() < 0) {
6462
throw new InvalidDeliveryException("Number of tires cannot be negative");
6563
}
6664

67-
if (value.getNumberOfTires() >= 10) {
68-
context().forward(message.withValue(parseToJson(value)));
65+
if (message.value().getNumberOfTires() >= 10) {
66+
context().forward(message.withValue(parseToJson(message.value())));
6967
}
7068
}
7169

72-
private static DeliveryBooked parseFromJson(String value) {
73-
return gson.fromJson(value, DeliveryBooked.class);
74-
}
75-
7670
private static String parseToJson(DeliveryBooked value) {
7771
return gson.toJson(value);
7872
}

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/KafkaStreamsApp.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
2323
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
2424

25+
import com.google.gson.Gson;
2526
import com.michelin.kafka.error.handling.papi.handler.ExceptionTypeProcessingHandler;
2627
import java.util.Optional;
2728
import java.util.Properties;
@@ -34,6 +35,8 @@
3435

3536
/** Kafka Streams application. */
3637
public class KafkaStreamsApp {
38+
private static final Gson gson = new Gson();
39+
3740
public static void main(String[] args) {
3841
Properties properties = new Properties();
3942
properties.put(APPLICATION_ID_CONFIG, "processing-error-handling-papi-app");
@@ -56,7 +59,12 @@ public static void main(String[] args) {
5659

5760
public static void buildTopology(StreamsBuilder streamsBuilder) {
5861
streamsBuilder.stream("delivery_booked_topic", Consumed.with(Serdes.String(), Serdes.String()))
62+
.mapValues(KafkaStreamsApp::parseFromJson)
5963
.process(CustomProcessor::new)
6064
.to("filtered_delivery_booked_papi_topic", Produced.with(Serdes.String(), Serdes.String()));
6165
}
66+
67+
private static DeliveryBooked parseFromJson(String value) {
68+
return gson.fromJson(value, DeliveryBooked.class);
69+
}
6270
}

0 commit comments

Comments
 (0)