Skip to content

Commit 928aef5

Browse files
author
Loïc GREFFIER
authored
Disposed Single when deploying connector (#206)
1 parent 11ca5d7 commit 928aef5

File tree

1 file changed

+13
-13
lines changed

1 file changed

+13
-13
lines changed

api/src/main/java/com/michelin/ns4kafka/services/executors/ConnectorAsyncExecutor.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.michelin.ns4kafka.services.executors;
22

3-
import com.michelin.ns4kafka.models.connector.Connector;
43
import com.michelin.ns4kafka.models.ObjectMeta;
4+
import com.michelin.ns4kafka.models.connector.Connector;
55
import com.michelin.ns4kafka.repositories.ConnectorRepository;
66
import com.michelin.ns4kafka.services.connect.KafkaConnectClientProxy;
77
import com.michelin.ns4kafka.services.connect.client.KafkaConnectClient;
@@ -11,6 +11,7 @@
1111
import io.micronaut.http.client.exceptions.HttpClientResponseException;
1212
import io.micronaut.http.client.exceptions.ReadTimeoutException;
1313
import io.reactivex.Single;
14+
import io.reactivex.internal.observers.ConsumerSingleObserver;
1415
import lombok.extern.slf4j.Slf4j;
1516

1617
import javax.inject.Inject;
@@ -68,7 +69,7 @@ private void synchronizeConnectCluster(String connectCluster) {
6869
connectCluster);
6970

7071
collectBrokerConnectors(connectCluster)
71-
.subscribe(brokerConnectors -> {
72+
.subscribe(new ConsumerSingleObserver<>(brokerConnectors -> {
7273
List<Connector> ns4kafkaConnectors = collectNs4KafkaConnectors(connectCluster);
7374

7475
List<Connector> toCreate = ns4kafkaConnectors.stream()
@@ -100,20 +101,19 @@ private void synchronizeConnectCluster(String connectCluster) {
100101
},
101102
error -> {
102103
if (error instanceof HttpClientResponseException) {
103-
log.error("Invalid Http response {} during connectors synchronization for Kafka cluster {} and Connect cluster {}",
104-
((HttpClientResponseException) error).getStatus(),
105-
kafkaAsyncExecutorConfig.getName(),
106-
connectCluster);
104+
log.error("Invalid HTTP response {} ({}) during connectors synchronization for Kafka cluster {} and Connect cluster {}",
105+
((HttpClientResponseException) error).getStatus(), ((HttpClientResponseException) error).getResponse().getStatus(),
106+
kafkaAsyncExecutorConfig.getName(), connectCluster);
107107
} else if (error instanceof ReadTimeoutException) {
108-
log.error("ReadTimeoutException during connectors synchronization for Kafka cluster {} and Connect cluster {}",
108+
log.error("Read timeout during connectors synchronization for Kafka cluster {} and Connect cluster {}",
109109
kafkaAsyncExecutorConfig.getName(),
110110
connectCluster);
111111
} else {
112112
log.error("Exception during connectors synchronization for Kafka cluster {} and Connect cluster {} : {}",
113113
kafkaAsyncExecutorConfig.getName(),
114114
connectCluster, error);
115115
}
116-
});
116+
}));
117117
}
118118

119119
/**
@@ -194,10 +194,10 @@ private boolean connectorsAreSame(Connector expected, Connector actual) {
194194
private void deployConnector(Connector connector) {
195195
kafkaConnectClient.createOrUpdate(KafkaConnectClientProxy.PROXY_SECRET, kafkaAsyncExecutorConfig.getName(),
196196
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(),
197-
ConnectorSpecs.builder().config(connector.getSpec().getConfig()).build())
198-
.subscribe(httpResponse -> log.info("Success deploying Connector [{}] on Kafka [{}] Connect [{}]",
199-
connector.getMetadata().getName(), kafkaAsyncExecutorConfig.getName(), connector.getSpec().getConnectCluster()),
200-
httpError -> log.error(String.format("Error deploying Connector [%s] on Kafka [%s] Connect [%s]",
201-
connector.getMetadata().getName(), kafkaAsyncExecutorConfig.getName(), connector.getSpec().getConnectCluster())));
197+
ConnectorSpecs.builder().config(connector.getSpec().getConfig()).build())
198+
.subscribe(new ConsumerSingleObserver<>(httpResponse -> log.info("Success deploying Connector [{}] on Kafka [{}] Connect [{}]",
199+
connector.getMetadata().getName(), kafkaAsyncExecutorConfig.getName(), connector.getSpec().getConnectCluster()),
200+
httpError -> log.error(String.format("Error deploying Connector [%s] on Kafka [%s] Connect [%s]",
201+
connector.getMetadata().getName(), kafkaAsyncExecutorConfig.getName(), connector.getSpec().getConnectCluster()))));
202202
}
203203
}

0 commit comments

Comments
 (0)