Skip to content

Commit f4c2fda

Browse files
author
Loïc GREFFIER
authored
Performed retries on apply/delete timeout (#208)
* Perform retry when ns4kafka timeout on schema registry/kconnect * Perform retry when ns4kafka timeout on schema registry/kconnect * Update condition * Update config
1 parent 57795fd commit f4c2fda

File tree

5 files changed

+52
-3
lines changed

5 files changed

+52
-3
lines changed

cli/src/main/java/com/michelin/ns4kafka/cli/client/NamespacedResourceClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package com.michelin.ns4kafka.cli.client;
22

3+
import com.michelin.ns4kafka.cli.client.predicates.RetryTimeoutPredicate;
34
import com.michelin.ns4kafka.cli.models.Resource;
45
import com.michelin.ns4kafka.cli.models.SchemaCompatibility;
56
import io.micronaut.http.HttpResponse;
67
import io.micronaut.http.annotation.*;
78
import io.micronaut.http.client.annotation.Client;
9+
import io.micronaut.retry.annotation.Retryable;
810

911
import java.util.List;
1012
import java.util.Map;
@@ -21,6 +23,10 @@ public interface NamespacedResourceClient {
2123
* @return The delete response
2224
*/
2325
@Delete("{namespace}/{kind}/{resourceName}{?dryrun}")
26+
@Retryable(delay = "${kafkactl.retry.delete.delay}",
27+
attempts = "${kafkactl.retry.delete.attempt}",
28+
multiplier = "${kafkactl.retry.delete.multiplier}",
29+
predicate = RetryTimeoutPredicate.class)
2430
HttpResponse<Void> delete(
2531
String namespace,
2632
String kind,
@@ -38,11 +44,15 @@ HttpResponse<Void> delete(
3844
* @return The resource
3945
*/
4046
@Post("{namespace}/{kind}{?dryrun}")
47+
@Retryable(delay = "${kafkactl.retry.apply.delay}",
48+
attempts = "${kafkactl.retry.apply.attempt}",
49+
multiplier = "${kafkactl.retry.apply.multiplier}",
50+
predicate = RetryTimeoutPredicate.class)
4151
HttpResponse<Resource> apply(
4252
String namespace,
4353
String kind,
4454
@Header("Authorization") String token,
45-
@Body Resource json,
55+
@Body Resource resource,
4656
@QueryValue boolean dryrun);
4757

4858
/**
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.michelin.ns4kafka.cli.client.predicates;
2+
3+
import com.michelin.ns4kafka.cli.models.Status;
4+
import io.micronaut.http.client.exceptions.HttpClientResponseException;
5+
import io.micronaut.retry.annotation.RetryPredicate;
6+
import io.netty.handler.codec.http.HttpResponseStatus;
7+
8+
import java.util.Optional;
9+
10+
public class RetryTimeoutPredicate implements RetryPredicate {
11+
/**
12+
* Detect when Ns4Kafka return a timeout exception (e.g. when deploying schemas, connectors)
13+
* @param throwable the input argument
14+
* @return true if a retry is necessary, false otherwise
15+
*/
16+
@Override
17+
public boolean test(Throwable throwable) {
18+
if (throwable instanceof HttpClientResponseException) {
19+
Optional<Status> statusOptional = ((HttpClientResponseException) throwable).getResponse().getBody(Status.class);
20+
if (statusOptional.isPresent() && statusOptional.get().getDetails() != null
21+
&& !statusOptional.get().getDetails().getCauses().isEmpty()
22+
&& HttpResponseStatus.INTERNAL_SERVER_ERROR.code() == statusOptional.get().getCode()
23+
&& statusOptional.get().getDetails().getCauses().stream().anyMatch(cause -> cause.contains("Read Timeout"))) {
24+
System.out.println("Read timeout... retrying...");
25+
return true;
26+
}
27+
}
28+
return false;
29+
}
30+
}

cli/src/main/java/com/michelin/ns4kafka/cli/services/ResourceService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public HttpResponse<Resource> apply(ApiResource apiResource, String namespace, R
9090
public boolean delete(ApiResource apiResource, String namespace, String resource, boolean dryRun) {
9191
try {
9292
if (apiResource.isNamespaced()) {
93-
HttpResponse response = namespacedClient.delete(namespace, apiResource.getPath(), resource, loginService.getAuthorization(), dryRun);
93+
HttpResponse<Void> response = namespacedClient.delete(namespace, apiResource.getPath(), resource, loginService.getAuthorization(), dryRun);
9494
if(response.getStatus() != HttpStatus.NO_CONTENT){
9595
throw new HttpClientResponseException("Resource not Found", response);
9696
}

cli/src/main/resources/application.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ kafkactl:
8484
- "NAMESPACE:/spec/namespace"
8585
- "API:/spec/api"
8686
- "TOKEN:/spec/token"
87+
retry:
88+
apply:
89+
delay: "2s"
90+
attempt: "10"
91+
multiplier: "1.0"
92+
delete:
93+
delay: "2s"
94+
attempt: "10"
95+
multiplier: "1.0"
8796
contexts:
8897
-
8998

cli/src/main/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313
<!-- Use application.yml to configure loggers-->
1414
<appender-ref ref="STDOUT" />
1515
</root>
16-
</configuration>
16+
</configuration>

0 commit comments

Comments
 (0)