Skip to content

BE: Impl a default timeout for http requests #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {

public RetryingKafkaConnectClient(ClustersProperties.ConnectCluster config,
@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
DataSize maxBuffSize) {
super(new RetryingApiClient(config, truststoreConfig, maxBuffSize));
DataSize maxBuffSize,
Duration responseTimeout) {
super(new RetryingApiClient(config, truststoreConfig, maxBuffSize, responseTimeout));
}

private static Retry conflictCodeRetry() {
Expand Down Expand Up @@ -318,14 +319,16 @@ private static class RetryingApiClient extends ApiClient {

public RetryingApiClient(ClustersProperties.ConnectCluster config,
ClustersProperties.TruststoreConfig truststoreConfig,
DataSize maxBuffSize) {
super(buildWebClient(maxBuffSize, config, truststoreConfig), null, null);
DataSize maxBuffSize,
Duration responseTimeout) {
super(buildWebClient(maxBuffSize, responseTimeout, config, truststoreConfig), null, null);
setBasePath(config.getAddress());
setUsername(config.getUsername());
setPassword(config.getPassword());
}

public static WebClient buildWebClient(DataSize maxBuffSize,
Duration responseTimeout,
ClustersProperties.ConnectCluster config,
ClustersProperties.TruststoreConfig truststoreConfig) {
return new WebClientConfigurator()
Expand All @@ -341,6 +344,7 @@ public static WebClient buildWebClient(DataSize maxBuffSize,
config.getPassword()
)
.configureBufferSize(maxBuffSize)
.configureResponseTimeout(responseTimeout)
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static class PollingProperties {
Integer pollTimeoutMs;
Integer maxPageSize;
Integer defaultPageSize;
Integer responseTimeoutMs;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public class WebclientProperties {

String maxInMemoryBufferSize;
Integer responseTimeoutMs;

@PostConstruct
public void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.kafbat.ui.util.KafkaServicesValidation;
import io.kafbat.ui.util.ReactiveFailover;
import io.kafbat.ui.util.WebClientConfigurator;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -37,13 +38,18 @@
public class KafkaClusterFactory {

private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
private static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofSeconds(20);

private final DataSize webClientMaxBuffSize;
private final Duration responseTimeout;

public KafkaClusterFactory(WebclientProperties webclientProperties) {
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
.map(DataSize::parse)
.orElse(DEFAULT_WEBCLIENT_BUFFER);
this.responseTimeout = Optional.ofNullable(webclientProperties.getResponseTimeoutMs())
.map(Duration::ofMillis)
.orElse(DEFAULT_RESPONSE_TIMEOUT);
}

public KafkaCluster create(ClustersProperties properties,
Expand Down Expand Up @@ -147,7 +153,8 @@ private ReactiveFailover<KafkaConnectClientApi> connectClient(ClustersProperties
url -> new RetryingKafkaConnectClient(
connectCluster.toBuilder().address(url).build(),
cluster.getSsl(),
webClientMaxBuffSize
webClientMaxBuffSize,
responseTimeout
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No alive connect instances available",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.time.Duration;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
Expand Down Expand Up @@ -144,6 +145,11 @@ public WebClientConfigurator configureCodecs(Consumer<ClientCodecConfigurer> con
return this;
}

public WebClientConfigurator configureResponseTimeout(Duration responseTimeout) {
httpClient = httpClient.responseTimeout(responseTimeout);
return this;
}

public WebClient build() {
return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
}
Expand Down
3 changes: 3 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4245,6 +4245,9 @@ components:
maxInMemoryBufferSize:
type: string
description: "examples: 20, 12KB, 5MB"
responseTimeoutMs:
type: integer
description: "general response timeout in milliseconds for all http requests"
kafka:
type: object
properties:
Expand Down
Loading