Skip to content

Commit c211bec

Browse files
committed
Update processor Id and record type handlers
1 parent 97fd971 commit c211bec

File tree

4 files changed

+34
-8
lines changed

4 files changed

+34
-8
lines changed

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/ProcessorIdProcessingHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ record != null ? record.value() : null,
3535

3636
@Override
3737
public void configure(Map<String, ?> map) {
38-
38+
// Do nothing
3939
}
4040
}

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/RecordTypeProcessingHandler.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,33 @@
55
import org.apache.kafka.streams.errors.ErrorHandlerContext;
66
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
77
import org.apache.kafka.streams.processor.api.Record;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
810

911
public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
12+
private static final Logger log = LoggerFactory.getLogger(RecordTypeProcessingHandler.class);
13+
1014
@Override
1115
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
12-
if (record.value() instanceof DeliveryBooked deliveryBooked) {
16+
log.warn(
17+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
18+
context.processorNodeId(),
19+
context.topic(),
20+
context.partition(),
21+
context.offset(),
22+
record != null ? record.key() : null,
23+
record != null ? record.value() : null,
24+
exception);
25+
26+
if (record != null && record.value() instanceof DeliveryBooked deliveryBooked) {
1327
return deliveryBooked.getNumberOfTires() == null ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
1428
}
1529

1630
return ProcessingHandlerResponse.CONTINUE;
17-
1831
}
1932

2033
@Override
2134
public void configure(Map<String, ?> map) {
22-
35+
// Do nothing
2336
}
2437
}

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/ProcessorIdProcessingHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ record != null ? record.value() : null,
3535

3636
@Override
3737
public void configure(Map<String, ?> map) {
38-
38+
// Do nothing
3939
}
4040
}

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/RecordTypeProcessingHandler.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,33 @@
55
import org.apache.kafka.streams.errors.ErrorHandlerContext;
66
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
77
import org.apache.kafka.streams.processor.api.Record;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
810

911
public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
12+
private static final Logger log = LoggerFactory.getLogger(RecordTypeProcessingHandler.class);
13+
1014
@Override
1115
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
12-
if (record.value() instanceof DeliveryBooked deliveryBooked) {
16+
log.warn(
17+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
18+
context.processorNodeId(),
19+
context.topic(),
20+
context.partition(),
21+
context.offset(),
22+
record != null ? record.key() : null,
23+
record != null ? record.value() : null,
24+
exception);
25+
26+
if (record != null && record.value() instanceof DeliveryBooked deliveryBooked) {
1327
return deliveryBooked.getNumberOfTires() == null ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
1428
}
1529

1630
return ProcessingHandlerResponse.CONTINUE;
17-
1831
}
1932

2033
@Override
2134
public void configure(Map<String, ?> map) {
22-
35+
// Do nothing
2336
}
2437
}

0 commit comments

Comments
 (0)