Skip to content

Commit 033f757

Browse files
committed
Java format
1 parent c211bec commit 033f757

File tree

12 files changed

+186
-88
lines changed

12 files changed

+186
-88
lines changed

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/InvalidDeliveryException.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
119
package com.michelin.kafka.error.handling.dsl;
220

321
public class InvalidDeliveryException extends RuntimeException {

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/KafkaStreamsApp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public static void main(String[] args) {
4848
buildTopology(streamsBuilder);
4949

5050
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
51-
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
51+
kafkaStreams.setUncaughtExceptionHandler(
52+
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
5253
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
5354

5455
kafkaStreams.start();

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/ExceptionTypeProcessingHandler.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,9 @@ record != null ? record.key() : null,
4343
record != null ? record.value() : null,
4444
exception);
4545

46-
if (exception instanceof JsonSyntaxException)
47-
return ProcessingHandlerResponse.FAIL;
48-
if (exception instanceof InvalidDeliveryException)
49-
return ProcessingHandlerResponse.CONTINUE;
50-
if (exception instanceof NetworkException)
51-
return ProcessingHandlerResponse.FAIL;
46+
if (exception instanceof JsonSyntaxException) return ProcessingHandlerResponse.FAIL;
47+
if (exception instanceof InvalidDeliveryException) return ProcessingHandlerResponse.CONTINUE;
48+
if (exception instanceof NetworkException) return ProcessingHandlerResponse.FAIL;
5249

5350
return ProcessingHandlerResponse.CONTINUE;
5451
}

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/ProcessorIdProcessingHandler.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
119
package com.michelin.kafka.error.handling.dsl.handler;
220

321
import java.util.Map;
@@ -13,24 +31,20 @@ public class ProcessorIdProcessingHandler implements ProcessingExceptionHandler
1331
@Override
1432
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
1533
log.warn(
16-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
17-
context.processorNodeId(),
18-
context.topic(),
19-
context.partition(),
20-
context.offset(),
21-
record != null ? record.key() : null,
22-
record != null ? record.value() : null,
23-
exception);
34+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
35+
context.processorNodeId(),
36+
context.topic(),
37+
context.partition(),
38+
context.offset(),
39+
record != null ? record.key() : null,
40+
record != null ? record.value() : null,
41+
exception);
2442

25-
if (context.processorNodeId().equals("MAP_PROCESSOR"))
26-
return ProcessingHandlerResponse.FAIL;
27-
if (context.processorNodeId().equals("FILTER_PROCESSOR"))
28-
return ProcessingHandlerResponse.CONTINUE;
29-
if (context.processorNodeId().equals("SELECT_KEY_PROCESSOR"))
30-
return ProcessingHandlerResponse.FAIL;
43+
if (context.processorNodeId().equals("MAP_PROCESSOR")) return ProcessingHandlerResponse.FAIL;
44+
if (context.processorNodeId().equals("FILTER_PROCESSOR")) return ProcessingHandlerResponse.CONTINUE;
45+
if (context.processorNodeId().equals("SELECT_KEY_PROCESSOR")) return ProcessingHandlerResponse.FAIL;
3146

3247
return ProcessingHandlerResponse.CONTINUE;
33-
3448
}
3549

3650
@Override

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/handler/RecordTypeProcessingHandler.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
119
package com.michelin.kafka.error.handling.dsl.handler;
220

321
import com.michelin.kafka.error.handling.dsl.DeliveryBooked;
@@ -14,17 +32,19 @@ public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
1432
@Override
1533
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
1634
log.warn(
17-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
18-
context.processorNodeId(),
19-
context.topic(),
20-
context.partition(),
21-
context.offset(),
22-
record != null ? record.key() : null,
23-
record != null ? record.value() : null,
24-
exception);
35+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
36+
context.processorNodeId(),
37+
context.topic(),
38+
context.partition(),
39+
context.offset(),
40+
record != null ? record.key() : null,
41+
record != null ? record.value() : null,
42+
exception);
2543

2644
if (record != null && record.value() instanceof DeliveryBooked deliveryBooked) {
27-
return deliveryBooked.getNumberOfTires() == null ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
45+
return deliveryBooked.getNumberOfTires() == null
46+
? ProcessingHandlerResponse.CONTINUE
47+
: ProcessingHandlerResponse.FAIL;
2848
}
2949

3050
return ProcessingHandlerResponse.CONTINUE;

dsl/src/test/java/com/michelin/kafka/error/handling/dsl/KafkaStreamsAppTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ void tearDown() {
7171
@Test
7272
void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
7373
inputTopic.pipeInput(
74-
"DEL12345",
75-
"""
74+
"DEL12345",
75+
"""
7676
{
7777
"deliveryId": "DEL12345",
7878
"truckId": "TRK56789",
@@ -83,8 +83,8 @@ void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
8383

8484
// "numberOfTires" is negative. This will throw an InvalidDeliveryException
8585
inputTopic.pipeInput(
86-
"DEL73148",
87-
"""
86+
"DEL73148",
87+
"""
8888
{
8989
"deliveryId": "DEL67145",
9090
"truckId": "TRK34567",
@@ -95,24 +95,23 @@ void shouldContinueOnInvalidDeliveryAndNullPointerExceptions() {
9595

9696
// "numberOfTires" is missing. This will throw a NullPointerException
9797
inputTopic.pipeInput(
98-
"DEL73148",
99-
"""
98+
"DEL73148",
99+
"""
100100
{
101101
"deliveryId": "DEL73148",
102102
"truckId": "TRK48612",
103103
"destination": "Lyon"
104104
}
105105
""");
106106

107-
108107
List<KeyValue<String, String>> results = outputTopic.readKeyValuesToList();
109108

110109
assertEquals("DEL12345", results.getFirst().key);
111110

112111
assertEquals(2.0, testDriver.metrics().get(droppedRecordsTotalMetric()).metricValue());
113112
assertEquals(
114-
0.06666666666666667,
115-
testDriver.metrics().get(droppedRecordsRateMetric()).metricValue());
113+
0.06666666666666667,
114+
testDriver.metrics().get(droppedRecordsRateMetric()).metricValue());
116115
}
117116

118117
private MetricName droppedRecordsTotalMetric() {

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/InvalidDeliveryException.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
119
package com.michelin.kafka.error.handling.papi;
220

321
public class InvalidDeliveryException extends RuntimeException {

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/KafkaStreamsApp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public static void main(String[] args) {
4646
buildTopology(streamsBuilder);
4747

4848
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
49-
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
49+
kafkaStreams.setUncaughtExceptionHandler(
50+
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
5051
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
5152

5253
kafkaStreams.start();

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/ExceptionTypeProcessingHandler.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.gson.JsonSyntaxException;
2222
import com.michelin.kafka.error.handling.papi.InvalidDeliveryException;
23-
import com.michelin.kafka.error.handling.papi.KaboomException;
2423
import java.util.Map;
2524
import org.apache.kafka.common.errors.NetworkException;
2625
import org.apache.kafka.streams.errors.ErrorHandlerContext;
@@ -35,21 +34,18 @@ public class ExceptionTypeProcessingHandler implements ProcessingExceptionHandle
3534
@Override
3635
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
3736
log.warn(
38-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
39-
context.processorNodeId(),
40-
context.topic(),
41-
context.partition(),
42-
context.offset(),
43-
record != null ? record.key() : null,
44-
record != null ? record.value() : null,
45-
exception);
37+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
38+
context.processorNodeId(),
39+
context.topic(),
40+
context.partition(),
41+
context.offset(),
42+
record != null ? record.key() : null,
43+
record != null ? record.value() : null,
44+
exception);
4645

47-
if (exception instanceof JsonSyntaxException)
48-
return ProcessingHandlerResponse.FAIL;
49-
if (exception instanceof InvalidDeliveryException)
50-
return ProcessingHandlerResponse.CONTINUE;
51-
if (exception instanceof NetworkException)
52-
return ProcessingHandlerResponse.FAIL;
46+
if (exception instanceof JsonSyntaxException) return ProcessingHandlerResponse.FAIL;
47+
if (exception instanceof InvalidDeliveryException) return ProcessingHandlerResponse.CONTINUE;
48+
if (exception instanceof NetworkException) return ProcessingHandlerResponse.FAIL;
5349

5450
return ProcessingHandlerResponse.CONTINUE;
5551
}

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/ProcessorIdProcessingHandler.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
119
package com.michelin.kafka.error.handling.papi.handler;
220

321
import java.util.Map;
@@ -13,24 +31,20 @@ public class ProcessorIdProcessingHandler implements ProcessingExceptionHandler
1331
@Override
1432
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
1533
log.warn(
16-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
17-
context.processorNodeId(),
18-
context.topic(),
19-
context.partition(),
20-
context.offset(),
21-
record != null ? record.key() : null,
22-
record != null ? record.value() : null,
23-
exception);
34+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
35+
context.processorNodeId(),
36+
context.topic(),
37+
context.partition(),
38+
context.offset(),
39+
record != null ? record.key() : null,
40+
record != null ? record.value() : null,
41+
exception);
2442

25-
if (context.processorNodeId().equals("MAP_PROCESSOR"))
26-
return ProcessingHandlerResponse.FAIL;
27-
if (context.processorNodeId().equals("FILTER_PROCESSOR"))
28-
return ProcessingHandlerResponse.CONTINUE;
29-
if (context.processorNodeId().equals("SELECT_KEY_PROCESSOR"))
30-
return ProcessingHandlerResponse.FAIL;
43+
if (context.processorNodeId().equals("MAP_PROCESSOR")) return ProcessingHandlerResponse.FAIL;
44+
if (context.processorNodeId().equals("FILTER_PROCESSOR")) return ProcessingHandlerResponse.CONTINUE;
45+
if (context.processorNodeId().equals("SELECT_KEY_PROCESSOR")) return ProcessingHandlerResponse.FAIL;
3146

3247
return ProcessingHandlerResponse.CONTINUE;
33-
3448
}
3549

3650
@Override

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/handler/RecordTypeProcessingHandler.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
119
package com.michelin.kafka.error.handling.papi.handler;
220

321
import com.michelin.kafka.error.handling.papi.DeliveryBooked;
@@ -14,17 +32,19 @@ public class RecordTypeProcessingHandler implements ProcessingExceptionHandler {
1432
@Override
1533
public ProcessingHandlerResponse handle(ErrorHandlerContext context, Record<?, ?> record, Exception exception) {
1634
log.warn(
17-
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
18-
context.processorNodeId(),
19-
context.topic(),
20-
context.partition(),
21-
context.offset(),
22-
record != null ? record.key() : null,
23-
record != null ? record.value() : null,
24-
exception);
35+
"Exception caught for processorNodeId = {}, topic = {}, partition = {}, offset = {}, key = {}, value = {}",
36+
context.processorNodeId(),
37+
context.topic(),
38+
context.partition(),
39+
context.offset(),
40+
record != null ? record.key() : null,
41+
record != null ? record.value() : null,
42+
exception);
2543

2644
if (record != null && record.value() instanceof DeliveryBooked deliveryBooked) {
27-
return deliveryBooked.getNumberOfTires() == null ? ProcessingHandlerResponse.CONTINUE : ProcessingHandlerResponse.FAIL;
45+
return deliveryBooked.getNumberOfTires() == null
46+
? ProcessingHandlerResponse.CONTINUE
47+
: ProcessingHandlerResponse.FAIL;
2848
}
2949

3050
return ProcessingHandlerResponse.CONTINUE;

0 commit comments

Comments
 (0)