Skip to content

Commit 2be771d

Browse files
committed
Fix sonar issues
1 parent ae48f4b commit 2be771d

File tree

10 files changed

+89
-76
lines changed

10 files changed

+89
-76
lines changed

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/KafkaStreamsApp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ public static void main(String[] args) {
4747
StreamsBuilder streamsBuilder = new StreamsBuilder();
4848
buildTopology(streamsBuilder);
4949

50-
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
51-
kafkaStreams.setUncaughtExceptionHandler(
52-
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
53-
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
50+
try (KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties)) {
51+
kafkaStreams.setUncaughtExceptionHandler(
52+
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
53+
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
5454

55-
kafkaStreams.start();
55+
kafkaStreams.start();
56+
}
5657
}
5758

5859
public static void buildTopology(StreamsBuilder streamsBuilder) {

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/ExceptionTypeProcessingHandler.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,18 @@ public class ExceptionTypeProcessingHandler implements ProcessingExceptionHandle
3232
private static final Logger log = LoggerFactory.getLogger(ExceptionTypeProcessingHandler.class);
3333

3434
@Override
35-
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
36-
log.warn(
37-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
38-
context.processorNodeId(),
39-
context.topic(),
40-
context.partition(),
41-
context.offset(),
42-
record != null ? record.key() : null,
43-
record != null ? record.value() : null,
44-
exception);
35+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> message, Exception exception) {
36+
if (log.isWarnEnabled()) {
37+
log.warn(
38+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
39+
context.processorNodeId(),
40+
context.topic(),
41+
context.partition(),
42+
context.offset(),
43+
message != null ? message.key() : null,
44+
message != null ? message.value() : null,
45+
exception);
46+
}
4547

4648
if (exception instanceof JsonSyntaxException) return ProcessingHandlerResponse.FAIL;
4749
if (exception instanceof InvalidDeliveryException) return ProcessingHandlerResponse.CONTINUE;

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/ProcessorIdProcessingHandler.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,18 @@ public class ProcessorIdProcessingHandler implements ProcessingExceptionHandler
2929
private static final Logger log = LoggerFactory.getLogger(ProcessorIdProcessingHandler.class);
3030

3131
@Override
32-
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
33-
log.warn(
34-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
35-
context.processorNodeId(),
36-
context.topic(),
37-
context.partition(),
38-
context.offset(),
39-
record != null ? record.key() : null,
40-
record != null ? record.value() : null,
41-
exception);
32+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> message, Exception exception) {
33+
if (log.isWarnEnabled()) {
34+
log.warn(
35+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
36+
context.processorNodeId(),
37+
context.topic(),
38+
context.partition(),
39+
context.offset(),
40+
message != null ? message.key() : null,
41+
message != null ? message.value() : null,
42+
exception);
43+
}
4244

4345
if (context.processorNodeId().equals("MAP_PROCESSOR")) return ProcessingHandlerResponse.FAIL;
4446
if (context.processorNodeId().equals("FILTER_PROCESSOR")) return ProcessingHandlerResponse.CONTINUE;

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/RecordTypeProcessingHandler.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,20 @@ public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
3030
private static final Logger log = LoggerFactory.getLogger(RecordTypeProcessingHandler.class);
3131

3232
@Override
33-
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
34-
log.warn(
35-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
36-
context.processorNodeId(),
37-
context.topic(),
38-
context.partition(),
39-
context.offset(),
40-
record != null ? record.key() : null,
41-
record != null ? record.value() : null,
42-
exception);
33+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> message, Exception exception) {
34+
if (log.isWarnEnabled()) {
35+
log.warn(
36+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
37+
context.processorNodeId(),
38+
context.topic(),
39+
context.partition(),
40+
context.offset(),
41+
message != null ? message.key() : null,
42+
message != null ? message.value() : null,
43+
exception);
44+
}
4345

44-
if (record != null && record.value() instanceof DeliveryBooked deliveryBooked) {
46+
if (message != null && message.value() instanceof DeliveryBooked deliveryBooked) {
4547
return deliveryBooked.getNumberOfTires() == null
4648
? ProcessingHandlerResponse.CONTINUE
4749
: ProcessingHandlerResponse.FAIL;

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
<build>
7979
<finalName>${project.artifactId}</finalName>
8080
<plugins>
81-
8281
<plugin>
8382
<groupId>com.diffplug.spotless</groupId>
8483
<artifactId>spotless-maven-plugin</artifactId>

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/CustomProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ public void init(ProcessorContext<String, String> context) {
5757
}
5858

5959
@Override
60-
public void process(Record<String, String> record) {
61-
DeliveryBooked value = parseFromJson(record.value());
60+
public void process(Record<String, String> message) {
61+
DeliveryBooked value = parseFromJson(message.value());
6262

6363
if (value.getNumberOfTires() < 0) {
6464
throw new InvalidDeliveryException("Number of tires cannot be negative");
6565
}
6666

6767
if (value.getNumberOfTires() >= 10) {
68-
context().forward(record.withValue(parseToJson(value)));
68+
context().forward(message.withValue(parseToJson(value)));
6969
}
7070
}
7171

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/KafkaStreamsApp.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ public static void main(String[] args) {
4545
StreamsBuilder streamsBuilder = new StreamsBuilder();
4646
buildTopology(streamsBuilder);
4747

48-
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
49-
kafkaStreams.setUncaughtExceptionHandler(
50-
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
51-
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
48+
try (KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties)) {
49+
kafkaStreams.setUncaughtExceptionHandler(
50+
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
51+
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
5252

53-
kafkaStreams.start();
53+
kafkaStreams.start();
54+
}
5455
}
5556

5657
public static void buildTopology(StreamsBuilder streamsBuilder) {

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/ExceptionTypeProcessingHandler.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,18 @@ public class ExceptionTypeProcessingHandler implements ProcessingExceptionHandle
3232
private static final Logger log = LoggerFactory.getLogger(ExceptionTypeProcessingHandler.class);
3333

3434
@Override
35-
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
36-
log.warn(
37-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
38-
context.processorNodeId(),
39-
context.topic(),
40-
context.partition(),
41-
context.offset(),
42-
record != null ? record.key() : null,
43-
record != null ? record.value() : null,
44-
exception);
35+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> message, Exception exception) {
36+
if (log.isWarnEnabled()) {
37+
log.warn(
38+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
39+
context.processorNodeId(),
40+
context.topic(),
41+
context.partition(),
42+
context.offset(),
43+
message != null ? message.key() : null,
44+
message != null ? message.value() : null,
45+
exception);
46+
}
4547

4648
if (exception instanceof JsonSyntaxException) return ProcessingHandlerResponse.FAIL;
4749
if (exception instanceof InvalidDeliveryException) return ProcessingHandlerResponse.CONTINUE;

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/ProcessorIdProcessingHandler.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,18 @@ public class ProcessorIdProcessingHandler implements ProcessingExceptionHandler
2929
private static final Logger log = LoggerFactory.getLogger(ProcessorIdProcessingHandler.class);
3030

3131
@Override
32-
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
33-
log.warn(
34-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
35-
context.processorNodeId(),
36-
context.topic(),
37-
context.partition(),
38-
context.offset(),
39-
record != null ? record.key() : null,
40-
record != null ? record.value() : null,
41-
exception);
32+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> message, Exception exception) {
33+
if (log.isWarnEnabled()) {
34+
log.warn(
35+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
36+
context.processorNodeId(),
37+
context.topic(),
38+
context.partition(),
39+
context.offset(),
40+
message != null ? message.key() : null,
41+
message != null ? message.value() : null,
42+
exception);
43+
}
4244

4345
if (context.processorNodeId().equals("MAP_PROCESSOR")) return ProcessingHandlerResponse.FAIL;
4446
if (context.processorNodeId().equals("FILTER_PROCESSOR")) return ProcessingHandlerResponse.CONTINUE;

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/RecordTypeProcessingHandler.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,20 @@ public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
3030
private static final Logger log = LoggerFactory.getLogger(RecordTypeProcessingHandler.class);
3131

3232
@Override
33-
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
34-
log.warn(
35-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
36-
context.processorNodeId(),
37-
context.topic(),
38-
context.partition(),
39-
context.offset(),
40-
record != null ? record.key() : null,
41-
record != null ? record.value() : null,
42-
exception);
33+
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> message, Exception exception) {
34+
if (log.isWarnEnabled()) {
35+
log.warn(
36+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
37+
context.processorNodeId(),
38+
context.topic(),
39+
context.partition(),
40+
context.offset(),
41+
message != null ? message.key() : null,
42+
message != null ? message.value() : null,
43+
exception);
44+
}
4345

44-
if (record != null && record.value() instanceof DeliveryBooked deliveryBooked) {
46+
if (message != null && message.value() instanceof DeliveryBooked deliveryBooked) {
4547
return deliveryBooked.getNumberOfTires() == null
4648
? ProcessingHandlerResponse.CONTINUE
4749
: ProcessingHandlerResponse.FAIL;

0 commit comments

Comments
 (0)