Skip to content

Commit a91a9f8

Browse files
authored
log and return empty mono when WebClient request fails sending zipkin trace data (#2170)
* log and return empty mono when WebClient request fails sending zipkin trace data * updated tests and RestTemplateSender to handle exceptions * removed unused imports * added the option to send in a custom doOnError function * removed unused loggers * added the option to send in a custom wrapper function which can customize the reactive flow * updated javadoc * updated javadoc
1 parent 2ab357a commit a91a9f8

File tree

2 files changed

+48
-6
lines changed

2 files changed

+48
-6
lines changed

spring-cloud-sleuth-zipkin/src/main/java/org/springframework/cloud/sleuth/zipkin2/WebClientSender.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import java.net.URI;
2020
import java.time.Duration;
21+
import java.util.function.Function;
2122

23+
import reactor.core.publisher.Mono;
2224
import zipkin2.Span;
2325
import zipkin2.codec.BytesEncoder;
2426
import zipkin2.reporter.Sender;
2527

2628
import org.springframework.http.MediaType;
29+
import org.springframework.http.ResponseEntity;
2730
import org.springframework.web.reactive.function.client.WebClient;
2831

2932
/**
@@ -47,7 +50,7 @@ public class WebClientSender extends HttpSender {
4750
*/
4851
@Deprecated
4952
public WebClientSender(WebClient webClient, String baseUrl, String apiPath, BytesEncoder<Span> encoder) {
50-
this(webClient, baseUrl, apiPath, encoder, DEFAULT_CHECK_TIMEOUT);
53+
this(null, webClient, baseUrl, apiPath, encoder, DEFAULT_CHECK_TIMEOUT);
5154
}
5255

5356
/**
@@ -60,13 +63,35 @@ public WebClientSender(WebClient webClient, String baseUrl, String apiPath, Byte
6063
*/
6164
public WebClientSender(WebClient webClient, String baseUrl, String apiPath, BytesEncoder<Span> encoder,
6265
long checkTimeout) {
63-
super((url, mediaType, bytes) -> post(url, mediaType, bytes, webClient, checkTimeout), baseUrl, apiPath,
64-
encoder);
66+
super((url, mediaType, bytes) -> post(null, url, mediaType, bytes, webClient, checkTimeout).block(), baseUrl,
67+
apiPath, encoder);
6568
}
6669

67-
private static void post(String url, MediaType mediaType, byte[] json, WebClient webClient, long checkTimeout) {
68-
webClient.post().uri(URI.create(url)).accept(mediaType).contentType(mediaType).bodyValue(json).retrieve()
69-
.toBodilessEntity().timeout(Duration.ofMillis(checkTimeout)).block();
70+
/**
71+
* Creates a new instance of {@link WebClientSender}.
72+
* @param webClient web client
73+
* @param wrapperFunction function that will be run on onErrorResume. Send in null to
74+
* get default behavior.
75+
* @param baseUrl base url
76+
* @param apiPath api path
77+
* @param encoder encoder
78+
* @param checkTimeout check timeout
79+
*/
80+
public WebClientSender(Function<Mono<ResponseEntity<Void>>, Mono<ResponseEntity<Void>>> wrapperFunction,
81+
WebClient webClient, String baseUrl, String apiPath, BytesEncoder<Span> encoder, long checkTimeout) {
82+
super((url, mediaType, bytes) -> post(wrapperFunction, url, mediaType, bytes, webClient, checkTimeout).block(),
83+
baseUrl, apiPath, encoder);
84+
}
85+
86+
private static Mono<ResponseEntity<Void>> post(
87+
Function<Mono<ResponseEntity<Void>>, Mono<ResponseEntity<Void>>> wrapperFunction, String url,
88+
MediaType mediaType, byte[] json, WebClient webClient, long checkTimeout) {
89+
if (wrapperFunction == null) {
90+
wrapperFunction = (response) -> response;
91+
}
92+
93+
return wrapperFunction.apply(webClient.post().uri(URI.create(url)).accept(mediaType).contentType(mediaType)
94+
.bodyValue(json).retrieve().toBodilessEntity().timeout(Duration.ofMillis(checkTimeout)));
7095
}
7196

7297
@Override

spring-cloud-sleuth-zipkin/src/test/java/org/springframework/cloud/sleuth/zipkin2/WebClientSenderTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@
1616

1717
package org.springframework.cloud.sleuth.zipkin2;
1818

19+
import java.io.IOException;
20+
21+
import org.junit.jupiter.api.Test;
22+
import reactor.core.publisher.Mono;
23+
import zipkin2.CheckResult;
1924
import zipkin2.reporter.Sender;
2025

2126
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
2227
import org.springframework.web.reactive.function.client.WebClient;
2328

29+
import static org.assertj.core.api.Assertions.assertThat;
2430
import static zipkin2.codec.SpanBytesEncoder.JSON_V2;
2531
import static zipkin2.codec.SpanBytesEncoder.PROTO3;
2632

@@ -59,4 +65,15 @@ String expectedToStringWithNonEmptyApiPath(String mockedApiPath) {
5965
return "WebClientSender{" + this.endpoint + mockedApiPath + "}";
6066
}
6167

68+
@Test
69+
void customFunctionToResumeAfterError() throws IOException {
70+
WebClientSender sender = new WebClientSender((response) -> response.onErrorResume((error) -> Mono.empty()),
71+
WebClient.builder().clientConnector(new ReactorClientHttpConnector()).build(), this.endpoint, "",
72+
PROTO3, DEFAULT_CHECK_TIMEOUT);
73+
74+
this.server.shutdown();
75+
CheckResult result = sender.check();
76+
assertThat(result.ok()).isTrue();
77+
}
78+
6279
}

0 commit comments

Comments
 (0)