Skip to content

Commit 1f2443c

Browse files
author
Loïc GREFFIER
authored
Improved logs on schemas timeout and CLI format (#207)
* Improved logs on schemas timeout and CLI format * Fix unit tests * Fix unit tests
1 parent 928aef5 commit 1f2443c

File tree

7 files changed

+38
-58
lines changed

7 files changed

+38
-58
lines changed

api/src/main/java/com/michelin/ns4kafka/controllers/ExceptionHandlerController.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.micronaut.http.annotation.Error;
1212
import io.micronaut.security.authentication.AuthenticationException;
1313
import io.micronaut.security.authentication.AuthorizationException;
14+
import lombok.extern.slf4j.Slf4j;
1415

1516
import javax.validation.ConstraintViolation;
1617
import javax.validation.ConstraintViolationException;
@@ -20,6 +21,7 @@
2021
import java.util.List;
2122
import java.util.stream.Collectors;
2223

24+
@Slf4j
2325
@Controller("/errors")
2426
public class ExceptionHandlerController {
2527
@Error(global = true)
@@ -122,19 +124,22 @@ public HttpResponse<Status> error(HttpRequest<?> request, AuthorizationException
122124

123125
}
124126

125-
// if we don't know the exception
126127
@Error(global = true)
127128
public HttpResponse<Status> error(HttpRequest<?> request, Exception exception) {
128-
var status = Status.builder()
129+
log.error("An error occurred on API endpoint {} {}: {}", request.getMethodName(), request.getUri(), exception.getMessage());
130+
131+
Status status = Status.builder()
129132
.status(StatusPhase.Failed)
130133
.message("Internal server error")
131134
.reason(StatusReason.InternalError)
132135
.details(StatusDetails.builder()
133-
.causes(List.of(exception.toString()))
136+
.causes(List.of(exception.getMessage() != null ? exception.getMessage() : exception.toString()))
134137
.build())
135138
.code(HttpStatus.INTERNAL_SERVER_ERROR.getCode())
136139
.build();
137-
return HttpResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
140+
141+
return HttpResponse
142+
.status(HttpStatus.INTERNAL_SERVER_ERROR)
138143
.body(status);
139144
}
140145
}

api/src/main/java/com/michelin/ns4kafka/services/SchemaService.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.michelin.ns4kafka.services.schema.client.entities.SchemaCompatibilityResponse;
1111
import com.michelin.ns4kafka.services.schema.client.entities.SchemaRequest;
1212
import com.michelin.ns4kafka.services.schema.client.entities.SchemaResponse;
13-
import io.micronaut.http.client.exceptions.HttpClientResponseException;
1413
import io.reactivex.Maybe;
1514
import io.reactivex.Single;
1615
import lombok.extern.slf4j.Slf4j;
@@ -165,16 +164,19 @@ public Single<List<String>> validateSchemaCompatibility(String cluster, Schema s
165164
.schema(schema.getSpec().getSchema())
166165
.references(schema.getSpec().getReferences())
167166
.build())
168-
.flatMap(schemaCompatibilityCheckSuccess -> {
169-
if (!schemaCompatibilityCheckSuccess.isCompatible()) {
170-
return Maybe.just(schemaCompatibilityCheckSuccess.messages());
171-
} else {
172-
return Maybe.just(List.<String>of());
167+
.map(Optional::of)
168+
.defaultIfEmpty(Optional.empty())
169+
.flatMapSingle(schemaCompatibilityCheckOptional -> {
170+
if (schemaCompatibilityCheckOptional.isEmpty()) {
171+
return Single.just(List.of());
173172
}
174-
},
175-
schemaCompatibilityCheckError -> Maybe.just(List.of("An error occurred during the schema validation (status code: " + ((HttpClientResponseException) schemaCompatibilityCheckError).getStatus() + ")")),
176-
() -> Maybe.just(List.<String>of()))
177-
.flatMapSingle(Single::just);
173+
174+
if (!schemaCompatibilityCheckOptional.get().isCompatible()) {
175+
return Single.just(schemaCompatibilityCheckOptional.get().messages());
176+
}
177+
178+
return Single.just(List.of());
179+
});
178180
}
179181

180182
/**

api/src/main/java/com/michelin/ns4kafka/services/executors/TopicAsyncExecutor.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,9 @@ public void synchronizeTopics(){
8686
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
8787

8888
if(log.isDebugEnabled()){
89-
log.debug("Topics to create : "+ toCreate.stream().map(t -> t.getMetadata().getName()).collect(Collectors.joining(", ")));
90-
//TODO reenable
91-
// LOG.debug("Topics to delete : "+String.join(", ", toDelete.stream().map(t -> t.getMetadata().getName()).collect(Collectors.toList())));
92-
log.debug("Topics to delete : "+toDelete.size());
93-
log.debug("Topic configs to update : "+toUpdate.size());
89+
log.debug("Topics to create: " + toCreate.stream().map(t -> t.getMetadata().getName()).collect(Collectors.joining(", ")));
90+
log.debug("Topics to delete: " + toDelete.size());
91+
log.debug("Topics to update: " + toUpdate.size());
9492
for (Map.Entry<ConfigResource,Collection<AlterConfigOp>> e : toUpdate.entrySet()) {
9593
for (AlterConfigOp op : e.getValue()) {
9694
log.debug(e.getKey().name()+" "+op.opType().toString()+" " +op.configEntry().name()+"("+op.configEntry().value()+")");

api/src/main/java/com/michelin/ns4kafka/services/schema/KafkaSchemaRegistryClientProxy.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import io.micronaut.http.MutableHttpResponse;
1111
import io.micronaut.http.annotation.Filter;
1212
import io.micronaut.http.client.ProxyHttpClient;
13-
import io.micronaut.http.filter.HttpServerFilter;
1413
import io.micronaut.http.filter.OncePerRequestHttpServerFilter;
1514
import io.micronaut.http.filter.ServerFilterChain;
1615
import org.reactivestreams.Publisher;
@@ -24,7 +23,7 @@
2423
@Filter(KafkaSchemaRegistryClientProxy.SCHEMA_REGISTRY_PREFIX + "/**")
2524
public class KafkaSchemaRegistryClientProxy extends OncePerRequestHttpServerFilter {
2625
/**
27-
* Schema registry prefix used to filter request to schema registries. It'll be replace by
26+
* Schema registry prefix used to filter request to schema registries. It'll be replaced by
2827
* the schema registry URL of the
2928
*/
3029
public static final String SCHEMA_REGISTRY_PREFIX = "/schema-registry-proxy";
@@ -94,7 +93,7 @@ public Publisher<MutableHttpResponse<?>> doFilterOnce(HttpRequest<?> request, Se
9493
return Publishers.just(new ResourceValidationException(List.of("Kafka Cluster [" + kafkaCluster + "] has no schema registry"),null,null));
9594
}
9695

97-
return this.client.proxy(mutateSchemaRegistryRequest(request, config.get()));
96+
return client.proxy(mutateSchemaRegistryRequest(request, config.get()));
9897
}
9998

10099
/**

api/src/test/java/com/michelin/ns4kafka/controllers/ExceptionHandlerControllerTest.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
11
package com.michelin.ns4kafka.controllers;
22

3-
import org.junit.jupiter.api.Assertions;
4-
import org.junit.jupiter.api.Disabled;
5-
import org.junit.jupiter.api.Test;
6-
73
import io.micronaut.http.HttpMethod;
84
import io.micronaut.http.HttpRequest;
95
import io.micronaut.http.HttpStatus;
106
import io.micronaut.security.authentication.AuthenticationException;
117
import io.micronaut.security.authentication.AuthorizationException;
128
import io.micronaut.security.authentication.DefaultAuthentication;
13-
import javax.validation.ConstraintViolationException;
9+
import org.junit.jupiter.api.Assertions;
10+
import org.junit.jupiter.api.Test;
1411

12+
import javax.validation.ConstraintViolationException;
1513
import java.util.List;
1614
import java.util.Map;
1715
import java.util.Set;
1816

19-
public class ExceptionHandlerControllerTest {
17+
class ExceptionHandlerControllerTest {
2018

2119
ExceptionHandlerController exceptionHandlerController = new ExceptionHandlerController();
2220

2321
@Test
24-
void ressourceValidationError() {
22+
void resourceValidationError() {
2523
var response = exceptionHandlerController.error(HttpRequest.create(HttpMethod.POST, "local")
2624
,new ResourceValidationException(List.of("Error1", "Error2"),"Topic", "Name"));
2725
var status = response.body();
@@ -36,15 +34,13 @@ void ressourceValidationError() {
3634
}
3735

3836
@Test
39-
@Disabled
4037
void constraintViolationError() {
4138
var response = exceptionHandlerController.error(HttpRequest.create(HttpMethod.POST, "local")
4239
,new ConstraintViolationException(Set.of()));
4340
var status = response.body();
4441

4542
Assertions.assertEquals(HttpStatus.UNPROCESSABLE_ENTITY, response.getStatus());
4643
Assertions.assertEquals(HttpStatus.UNPROCESSABLE_ENTITY.getCode(), status.getCode());
47-
//Assertions.assertEquals("Error1", status.getDetails().getCauses().get(0));
4844
}
4945

5046
@Test

api/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import com.michelin.ns4kafka.services.schema.client.entities.SchemaCompatibilityCheckResponse;
1010
import com.michelin.ns4kafka.services.schema.client.entities.SchemaCompatibilityResponse;
1111
import com.michelin.ns4kafka.services.schema.client.entities.SchemaResponse;
12-
import io.micronaut.http.HttpResponse;
13-
import io.micronaut.http.client.exceptions.HttpClientResponseException;
1412
import io.reactivex.Maybe;
1513
import io.reactivex.Single;
1614
import org.junit.jupiter.api.Assertions;
@@ -247,24 +245,6 @@ void validateSchemaCompatibility404NotFound() {
247245
.assertValue(List::isEmpty);
248246
}
249247

250-
/**
251-
* Test the schema compatibility validation when the Schema Registry throws an exception
252-
*/
253-
@Test
254-
void validateSchemaCompatibilityThrowsException() {
255-
Namespace namespace = buildNamespace();
256-
Schema schema = buildSchema();
257-
258-
when(kafkaSchemaRegistryClient.validateSchemaCompatibility(any(), any(), any(), any()))
259-
.thenReturn(Maybe.error(new HttpClientResponseException("Error", HttpResponse.notFound())));
260-
261-
schemaService.validateSchemaCompatibility(namespace.getMetadata().getCluster(), schema)
262-
.test()
263-
.assertValue(validationErrors -> validationErrors.size() == 1L)
264-
.assertValue(validationErrors -> validationErrors.get(0)
265-
.equals("An error occurred during the schema validation (status code: NOT_FOUND)"));
266-
}
267-
268248
/**
269249
* Test the schema compatibility update when reset to default is asked
270250
*/

cli/src/main/java/com/michelin/ns4kafka/cli/services/FormatService.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626

2727
@Singleton
2828
public class FormatService {
29-
30-
@Inject
31-
public KafkactlConfig kafkactlConfig;
32-
3329
private static final String YAML = "yaml";
3430
private static final String TABLE = "table";
3531
private final List<String> defaults = List.of(
@@ -38,6 +34,9 @@ public class FormatService {
3834
"AGE:/metadata/creationTimestamp%AGO"
3935
);
4036

37+
@Inject
38+
public KafkactlConfig kafkactlConfig;
39+
4140
public void displayList(String kind, List<Resource> resources, String output) {
4241
if (output.equals(TABLE)) {
4342
printTable(kind, resources);
@@ -60,11 +59,12 @@ public void displayError(HttpClientResponseException e, String kind, String name
6059
Optional<Status> statusOptional = e.getResponse().getBody(Status.class);
6160
if (statusOptional.isPresent() && statusOptional.get().getDetails() != null && !statusOptional.get().getDetails().getCauses().isEmpty()) {
6261
Status status = statusOptional.get();
63-
String causes = String.join("\n - ", status.getDetails().getCauses());
62+
String causes = status.getDetails().getCauses().size() > 1 ?
63+
"\n - " + String.join("\n - ", status.getDetails().getCauses()) : status.getDetails().getCauses().get(0);
6464

65-
System.out.printf("Failed %s/%s: %s for causes: %n - %s%n", kind, name, status.getMessage(), causes);
65+
System.out.printf("Failed %s/%s %s for causes: %s", kind, name, status.getMessage(), causes);
6666
} else {
67-
System.out.printf("Failed %s/%s: %s%n", kind, name, e.getMessage());
67+
System.out.printf("Failed %s/%s %s%n", kind, name, e.getMessage());
6868
}
6969
}
7070

0 commit comments

Comments
 (0)