Skip to content

Commit 1b08608

Browse files
author
Loïc GREFFIER
authored
Migrated connectors API to reactive (#198)
* Start using reactive on connector API * Start using reactive on connector API * Update unit tests with reactive * FIx unit tests
1 parent 2bd0b35 commit 1b08608

File tree

7 files changed

+612
-488
lines changed

7 files changed

+612
-488
lines changed

api/src/main/java/com/michelin/ns4kafka/controllers/ConnectController.java

Lines changed: 121 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import com.michelin.ns4kafka.services.KafkaConnectService;
77
import io.micronaut.http.HttpResponse;
88
import io.micronaut.http.HttpStatus;
9+
import io.micronaut.http.MutableHttpResponse;
910
import io.micronaut.http.annotation.*;
1011
import io.micronaut.scheduling.TaskExecutors;
1112
import io.micronaut.scheduling.annotation.ExecuteOn;
13+
import io.reactivex.Single;
1214
import io.swagger.v3.oas.annotations.tags.Tag;
1315

1416
import javax.inject.Inject;
@@ -17,19 +19,23 @@
1719
import java.util.Date;
1820
import java.util.List;
1921
import java.util.Optional;
20-
import java.util.concurrent.ExecutionException;
21-
import java.util.concurrent.TimeoutException;
2222
import java.util.stream.Collectors;
2323

2424
@Tag(name = "Connects")
2525
@Controller(value = "/api/namespaces/{namespace}/connects")
2626
@ExecuteOn(TaskExecutors.IO)
2727
public class ConnectController extends NamespacedResourceController {
2828
/**
29-
* The connector service
29+
* Message threw when namespace is not owner of the given connector
30+
*/
31+
private final String NAMESPACE_NOT_OWNER = "Namespace not owner of this connector %s.";
32+
33+
/**
34+
* Connector service
3035
*/
3136
@Inject
3237
KafkaConnectService kafkaConnectService;
38+
3339
/**
3440
* Get all the connectors by namespace
3541
* @param namespace The namespace
@@ -60,23 +66,22 @@ public Optional<Connector> getConnector(String namespace, String connector) {
6066
*/
6167
@Status(HttpStatus.NO_CONTENT)
6268
@Delete("/{connector}{?dryrun}")
63-
public HttpResponse<Void> deleteConnector(String namespace, String connector, @QueryValue(defaultValue = "false") boolean dryrun) {
69+
public Single<HttpResponse<Void>> deleteConnector(String namespace, String connector, @QueryValue(defaultValue = "false") boolean dryrun) {
6470
Namespace ns = getNamespace(namespace);
6571

6672
// Validate ownership
6773
if (!kafkaConnectService.isNamespaceOwnerOfConnect(ns, connector)) {
68-
throw new ResourceValidationException(List.of("Invalid value " + connector +
69-
" for name: Namespace not OWNER of this connector"), "Connector", connector);
74+
return Single.error(new ResourceValidationException(List.of(String.format(NAMESPACE_NOT_OWNER, connector)),
75+
"Connector", connector));
7076
}
7177

7278
Optional<Connector> optionalConnector = kafkaConnectService.findByName(ns, connector);
73-
7479
if (optionalConnector.isEmpty()) {
75-
return HttpResponse.notFound();
80+
return Single.just(HttpResponse.notFound());
7681
}
7782

7883
if (dryrun) {
79-
return HttpResponse.noContent();
84+
return Single.just(HttpResponse.noContent());
8085
}
8186

8287
Connector connectorToDelete = optionalConnector.get();
@@ -86,9 +91,9 @@ public HttpResponse<Void> deleteConnector(String namespace, String connector, @Q
8691
connectorToDelete.getSpec(),
8792
null);
8893

89-
kafkaConnectService.delete(ns, optionalConnector.get());
90-
91-
return HttpResponse.noContent();
94+
return kafkaConnectService
95+
.delete(ns, optionalConnector.get())
96+
.map(httpResponse -> HttpResponse.noContent());
9297
}
9398

9499
/**
@@ -99,13 +104,13 @@ public HttpResponse<Void> deleteConnector(String namespace, String connector, @Q
99104
* @return The created schema
100105
*/
101106
@Post("{?dryrun}")
102-
public HttpResponse<Connector> apply(String namespace, @Valid @Body Connector connector, @QueryValue(defaultValue = "false") boolean dryrun) {
107+
public Single<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connector connector, @QueryValue(defaultValue = "false") boolean dryrun) {
103108
Namespace ns = getNamespace(namespace);
104109

105110
// Validate ownership
106111
if (!kafkaConnectService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) {
107-
throw new ResourceValidationException(List.of("Invalid value " + connector.getMetadata().getName() +
108-
" for name: Namespace not OWNER of this connector"), connector.getKind(), connector.getMetadata().getName());
112+
return Single.error(new ResourceValidationException(List.of(String.format(NAMESPACE_NOT_OWNER, connector.getMetadata().getName())),
113+
connector.getKind(), connector.getMetadata().getName()));
109114
}
110115

111116
// Set / Override name in spec.config.name, required for several Kafka Connect API calls
@@ -119,97 +124,107 @@ public HttpResponse<Connector> apply(String namespace, @Valid @Body Connector co
119124
connector.getSpec().getConfig().put("name", connector.getMetadata().getName());
120125

121126
// Validate locally
122-
List<String> validationErrors = kafkaConnectService.validateLocally(ns, connector);
123-
if (!validationErrors.isEmpty()) {
124-
throw new ResourceValidationException(validationErrors, connector.getKind(), connector.getMetadata().getName());
125-
}
126-
127-
// Validate against connect rest API /validate
128-
validationErrors = kafkaConnectService.validateRemotely(ns, connector);
129-
if (!validationErrors.isEmpty()) {
130-
throw new ResourceValidationException(validationErrors, connector.getKind(), connector.getMetadata().getName());
131-
}
132-
133-
// Augment with server side fields
134-
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
135-
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
136-
connector.getMetadata().setNamespace(ns.getMetadata().getName());
137-
connector.setStatus(Connector.ConnectorStatus.builder()
138-
.state(Connector.TaskState.UNASSIGNED)
139-
.build());
140-
141-
Optional<Connector> existingConnector = kafkaConnectService.findByName(ns, connector.getMetadata().getName());
142-
if (existingConnector.isPresent() && existingConnector.get().equals(connector)) {
143-
return formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged);
144-
}
145-
146-
ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
147-
if (dryrun) {
148-
return formatHttpResponse(connector, status);
149-
}
150-
151-
sendEventLog(connector.getKind(),
152-
connector.getMetadata(),
153-
status,
154-
existingConnector.<Object>map(Connector::getSpec).orElse(null),
155-
connector.getSpec());
156-
157-
return formatHttpResponse(kafkaConnectService.createOrUpdate(connector), status);
127+
return kafkaConnectService.validateLocally(ns, connector)
128+
.flatMap(validationErrors -> {
129+
if (!validationErrors.isEmpty()) {
130+
return Single.error(new ResourceValidationException(validationErrors, connector.getKind(), connector.getMetadata().getName()));
131+
}
132+
133+
// Validate against connect rest API /validate
134+
return kafkaConnectService.validateRemotely(ns, connector)
135+
.flatMap(remoteValidationErrors -> {
136+
if (!remoteValidationErrors.isEmpty()) {
137+
return Single.error(new ResourceValidationException(remoteValidationErrors, connector.getKind(), connector.getMetadata().getName()));
138+
}
139+
140+
// Augment with server side fields
141+
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
142+
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
143+
connector.getMetadata().setNamespace(ns.getMetadata().getName());
144+
connector.setStatus(Connector.ConnectorStatus.builder()
145+
.state(Connector.TaskState.UNASSIGNED)
146+
.build());
147+
148+
Optional<Connector> existingConnector = kafkaConnectService.findByName(ns, connector.getMetadata().getName());
149+
if (existingConnector.isPresent() && existingConnector.get().equals(connector)) {
150+
return Single.just(formatHttpResponse(existingConnector.get(), ApplyStatus.unchanged));
151+
}
152+
153+
ApplyStatus status = existingConnector.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
154+
if (dryrun) {
155+
return Single.just(formatHttpResponse(connector, status));
156+
}
157+
158+
sendEventLog(connector.getKind(),
159+
connector.getMetadata(),
160+
status,
161+
existingConnector.<Object>map(Connector::getSpec).orElse(null),
162+
connector.getSpec());
163+
164+
return Single.just(formatHttpResponse(kafkaConnectService.createOrUpdate(connector), status));
165+
});
166+
});
158167
}
159168

160169
/**
161170
* Change the state of a connector
162171
* @param namespace The namespace
163172
* @param connector The connector to update the state
164173
* @param changeConnectorState The state to set
165-
* @return
174+
* @return The change connector state response
166175
*/
167176
@Post("/{connector}/change-state")
168-
public HttpResponse<ChangeConnectorState> changeState(String namespace, String connector, @Body @Valid ChangeConnectorState changeConnectorState) {
177+
public Single<MutableHttpResponse<ChangeConnectorState>> changeState(String namespace, String connector, @Body @Valid ChangeConnectorState changeConnectorState) {
169178
Namespace ns = getNamespace(namespace);
170179

171180
if (!kafkaConnectService.isNamespaceOwnerOfConnect(ns, connector)) {
172-
throw new ResourceValidationException(List.of("Invalid value " + connector +
173-
" for name: Namespace not OWNER of this connector"), "Connector", connector);
181+
return Single.error(new ResourceValidationException(List.of(String.format(NAMESPACE_NOT_OWNER, connector)),
182+
"Connector", connector));
174183
}
175184

176185
Optional<Connector> optionalConnector = kafkaConnectService.findByName(ns, connector);
177186

178187
if (optionalConnector.isEmpty()) {
179-
return HttpResponse.notFound();
188+
return Single.just(HttpResponse.notFound());
180189
}
181190

182-
HttpResponse response;
183-
try {
184-
switch (changeConnectorState.getSpec().getAction()) {
185-
case restart:
186-
response = kafkaConnectService.restart(ns, optionalConnector.get());
187-
break;
188-
case pause:
189-
response = kafkaConnectService.pause(ns, optionalConnector.get());
190-
break;
191-
case resume:
192-
response = kafkaConnectService.resume(ns, optionalConnector.get());
193-
break;
194-
default:
195-
throw new IllegalStateException("Unspecified Action "+changeConnectorState.getSpec().getAction());
196-
}
197-
198-
changeConnectorState.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
199-
.success(true)
200-
.code(response.status())
201-
.build());
202-
} catch (Exception e) {
203-
changeConnectorState.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
204-
.success(false)
205-
.code(HttpStatus.INTERNAL_SERVER_ERROR)
206-
.errorMessage(e.getMessage())
207-
.build());
191+
Single<HttpResponse<Void>> response;
192+
switch (changeConnectorState.getSpec().getAction()) {
193+
case restart:
194+
response = kafkaConnectService.restart(ns, optionalConnector.get());
195+
break;
196+
case pause:
197+
response = kafkaConnectService.pause(ns, optionalConnector.get());
198+
break;
199+
case resume:
200+
response = kafkaConnectService.resume(ns, optionalConnector.get());
201+
break;
202+
default:
203+
return Single.error(new IllegalStateException("Unspecified action " + changeConnectorState.getSpec().getAction()));
208204
}
209205

210-
changeConnectorState.setMetadata(optionalConnector.get().getMetadata());
211-
changeConnectorState.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
212-
return HttpResponse.ok(changeConnectorState);
206+
return response
207+
.doOnEvent((httpResponse, httpError) -> {
208+
if (httpResponse != null) {
209+
changeConnectorState.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
210+
.success(true)
211+
.code(httpResponse.status())
212+
.build());
213+
}
214+
215+
if (httpError != null) {
216+
changeConnectorState.setStatus(ChangeConnectorState.ChangeConnectorStateStatus.builder()
217+
.success(false)
218+
.code(HttpStatus.INTERNAL_SERVER_ERROR)
219+
.errorMessage(httpError.getMessage())
220+
.build());
221+
}
222+
223+
changeConnectorState.setMetadata(optionalConnector.get().getMetadata());
224+
changeConnectorState.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
225+
})
226+
.map(httpResponse -> HttpResponse.ok(changeConnectorState))
227+
.onErrorReturnItem(HttpResponse.ok(changeConnectorState));
213228
}
214229

215230
/**
@@ -219,26 +234,27 @@ public HttpResponse<ChangeConnectorState> changeState(String namespace, String c
219234
* @return The list of imported connectors
220235
*/
221236
@Post("/_/import{?dryrun}")
222-
public List<Connector> importResources(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) {
237+
public Single<List<Connector>> importResources(String namespace, @QueryValue(defaultValue = "false") boolean dryrun) {
223238
Namespace ns = getNamespace(namespace);
224-
List<Connector> unsynchronizedConnectors = kafkaConnectService.listUnsynchronizedConnectors(ns);
225-
226-
unsynchronizedConnectors.forEach(connector -> {
227-
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
228-
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
229-
connector.getMetadata().setNamespace(ns.getMetadata().getName());
230-
});
231-
232-
if (dryrun) {
233-
return unsynchronizedConnectors;
234-
}
235-
236-
return unsynchronizedConnectors
237-
.stream()
238-
.map(connector -> {
239-
sendEventLog(connector.getKind(), connector.getMetadata(), ApplyStatus.created, null, connector.getSpec());
240-
return kafkaConnectService.createOrUpdate(connector);
241-
})
242-
.collect(Collectors.toList());
239+
return kafkaConnectService.listUnsynchronizedConnectors(ns)
240+
.map(unsynchronizedConnectors -> {
241+
unsynchronizedConnectors.forEach(connector -> {
242+
connector.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
243+
connector.getMetadata().setCluster(ns.getMetadata().getCluster());
244+
connector.getMetadata().setNamespace(ns.getMetadata().getName());
245+
});
246+
247+
if (dryrun) {
248+
return unsynchronizedConnectors;
249+
}
250+
251+
return unsynchronizedConnectors
252+
.stream()
253+
.map(connector -> {
254+
sendEventLog(connector.getKind(), connector.getMetadata(), ApplyStatus.created, null, connector.getSpec());
255+
return kafkaConnectService.createOrUpdate(connector);
256+
})
257+
.collect(Collectors.toList());
258+
});
243259
}
244260
}

0 commit comments

Comments
 (0)