Skip to content

Commit 3bf6efc

Browse files
author
Loïc GREFFIER
authored
Migrated schemas "get all" to reactive (#196)
* Migrate schemas get all to reactive * Retrieve all information on get all * Fix some Sonar * Fix unit tests
1 parent b0173f9 commit 3bf6efc

19 files changed

+555
-166
lines changed

api/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class SchemaController extends NamespacedResourceController {
3535
* @return A list of schemas
3636
*/
3737
@Get
38-
public List<Schema> list(String namespace) {
38+
public Single<List<Schema>> list(String namespace) {
3939
Namespace ns = getNamespace(namespace);
4040
return schemaService.findAllForNamespace(ns);
4141
}

api/src/main/java/com/michelin/ns4kafka/services/SchemaService.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,21 @@ public class SchemaService {
3737

3838
/**
3939
* Get all the schemas by namespace
40-
*
4140
* @param namespace The namespace
4241
* @return A list of schemas
4342
*/
44-
public List<Schema> findAllForNamespace(Namespace namespace) {
43+
public Single<List<Schema>> findAllForNamespace(Namespace namespace) {
4544
List<AccessControlEntry> acls = accessControlEntryService.findAllGrantedToNamespace(namespace).stream()
4645
.filter(acl -> acl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER)
4746
.filter(acl -> acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.TOPIC)
4847
.collect(Collectors.toList());
4948

5049
return kafkaSchemaRegistryClient
5150
.getSubjects(KafkaSchemaRegistryClientProxy.PROXY_SECRET, namespace.getMetadata().getCluster())
52-
.stream()
51+
.toObservable()
52+
.flatMapIterable(subjects -> subjects)
5353
.filter(subject -> {
54-
String underlyingTopicName = subject.replaceAll("(-key|-value)$","");
54+
String underlyingTopicName = subject.replaceAll("(-key|-value)$", "");
5555

5656
return acls.stream().anyMatch(accessControlEntry -> {
5757
switch (accessControlEntry.getSpec().getResourcePatternType()) {
@@ -64,14 +64,28 @@ public List<Schema> findAllForNamespace(Namespace namespace) {
6464
return false;
6565
});
6666
})
67-
.map(namespacedSubject -> Schema.builder()
68-
.metadata(ObjectMeta.builder()
69-
.cluster(namespace.getMetadata().getCluster())
70-
.namespace(namespace.getMetadata().getName())
71-
.name(namespacedSubject)
72-
.build())
73-
.build())
74-
.collect(Collectors.toList());
67+
.flatMapMaybe(subject -> getLatestSubject(namespace, subject)
68+
.map(Optional::of)
69+
.defaultIfEmpty(Optional.empty())
70+
.map(schemaOptional -> {
71+
Schema schema = Schema.builder()
72+
.metadata(ObjectMeta.builder()
73+
.cluster(namespace.getMetadata().getCluster())
74+
.namespace(namespace.getMetadata().getName())
75+
.name(subject)
76+
.build())
77+
.build();
78+
79+
schemaOptional.ifPresent(value -> schema.setSpec(Schema.SchemaSpec.builder()
80+
.id(value.getSpec().getId())
81+
.version(value.getSpec().getVersion())
82+
.compatibility(value.getSpec().getCompatibility())
83+
.schemaType(value.getSpec().getSchemaType())
84+
.build()));
85+
86+
return schema;
87+
}))
88+
.toList();
7589
}
7690

7791
/**

api/src/main/java/com/michelin/ns4kafka/services/schema/client/KafkaSchemaRegistryClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
@Client(value = KafkaSchemaRegistryClientProxy.SCHEMA_REGISTRY_PREFIX)
1616
public interface KafkaSchemaRegistryClient {
1717
@Get("/subjects")
18-
List<String> getSubjects(@Header(value = KafkaSchemaRegistryClientProxy.PROXY_HEADER_SECRET) String secret,
18+
Single<List<String>> getSubjects(@Header(value = KafkaSchemaRegistryClientProxy.PROXY_HEADER_SECRET) String secret,
1919
@Header(value = KafkaSchemaRegistryClientProxy.PROXY_HEADER_KAFKA_CLUSTER) String cluster);
2020

2121
@Get("/subjects/{subject}/versions/latest")

api/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import io.micronaut.security.utils.SecurityService;
1313
import io.reactivex.Maybe;
1414
import io.reactivex.Single;
15-
import org.junit.jupiter.api.Assertions;
1615
import org.junit.jupiter.api.Test;
1716
import org.junit.jupiter.api.extension.ExtendWith;
1817
import org.mockito.InjectMocks;
@@ -230,12 +229,14 @@ void list() {
230229
Schema schema = buildSchema();
231230

232231
when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
233-
when(schemaService.findAllForNamespace(namespace)).thenReturn(List.of(schema));
234-
235-
List<Schema> response = schemaController.list("myNamespace");
236-
237-
Assertions.assertEquals(1L, response.size());
238-
Assertions.assertEquals("prefix.subject-value", response.get(0).getMetadata().getName());
232+
when(schemaService.findAllForNamespace(namespace)).thenReturn(Single.just(List.of(schema)));
233+
234+
schemaController.list("myNamespace")
235+
.test()
236+
.assertValue(schemas -> schemas.size() == 1)
237+
.assertValue(schemas -> schemas.get(0).getMetadata().getName().equals("prefix.subject-value"))
238+
.assertValue(schemas -> schemas.get(0).getSpec().getId() == 1)
239+
.assertValue(schemas -> schemas.get(0).getSpec().getVersion() == 1);
239240
}
240241

241242
/**

api/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,12 @@ class SchemaServiceTest {
5454
void getAllByNamespace() {
5555
Namespace namespace = buildNamespace();
5656
List<String> subjectsResponse = Arrays.asList("prefix.schema-one", "prefix2.schema-two", "prefix2.schema-three");
57+
SchemaCompatibilityResponse compatibilityResponse = buildCompatibilityResponse();
5758

58-
when(kafkaSchemaRegistryClient.getSubjects(KafkaSchemaRegistryClientProxy.PROXY_SECRET, namespace.getMetadata().getCluster())).thenReturn(subjectsResponse);
59+
when(kafkaSchemaRegistryClient.getSubjects(KafkaSchemaRegistryClientProxy.PROXY_SECRET, namespace.getMetadata().getCluster())).thenReturn(Single.just(subjectsResponse));
60+
when(kafkaSchemaRegistryClient.getLatestSubject(KafkaSchemaRegistryClientProxy.PROXY_SECRET, namespace.getMetadata().getCluster(), "prefix.schema-one")).thenReturn(Maybe.just(buildSchemaResponse("prefix.schema-one")));
61+
when(kafkaSchemaRegistryClient.getLatestSubject(KafkaSchemaRegistryClientProxy.PROXY_SECRET, namespace.getMetadata().getCluster(), "prefix2.schema-two")).thenReturn(Maybe.just(buildSchemaResponse("prefix2.schema-two")));
62+
when(kafkaSchemaRegistryClient.getCurrentCompatibilityBySubject(any(), any(), any())).thenReturn(Maybe.just(compatibilityResponse));
5963
Mockito.when(accessControlEntryService.findAllGrantedToNamespace(namespace))
6064
.thenReturn(List.of(
6165
AccessControlEntry.builder()
@@ -96,11 +100,12 @@ void getAllByNamespace() {
96100
.build()
97101
));
98102

99-
List<Schema> retrievedSchemas = schemaService.findAllForNamespace(namespace);
100-
Assertions.assertEquals(2L, retrievedSchemas.size());
101-
Assertions.assertEquals("prefix.schema-one", retrievedSchemas.get(0).getMetadata().getName());
102-
Assertions.assertEquals("prefix2.schema-two", retrievedSchemas.get(1).getMetadata().getName());
103-
Assertions.assertTrue(retrievedSchemas.stream().noneMatch(schema -> schema.getMetadata().getName().equals("prefix2.schema-three")));
103+
schemaService.findAllForNamespace(namespace)
104+
.test()
105+
.assertValue(schemas -> schemas.size() == 2)
106+
.assertValue(schemas -> schemas.get(0).getMetadata().getName().equals("prefix.schema-one"))
107+
.assertValue(schemas -> schemas.get(1).getMetadata().getName().equals("prefix2.schema-two"))
108+
.assertValue(schemas -> schemas.stream().noneMatch(schema -> schema.getMetadata().getName().equals("prefix2.schema-three")));
104109
}
105110

106111
/**
@@ -110,10 +115,11 @@ void getAllByNamespace() {
110115
void getAllByNamespaceEmptyResponse() {
111116
Namespace namespace = buildNamespace();
112117

113-
when(kafkaSchemaRegistryClient.getSubjects(KafkaSchemaRegistryClientProxy.PROXY_SECRET, namespace.getMetadata().getCluster())).thenReturn(List.of());
118+
when(kafkaSchemaRegistryClient.getSubjects(KafkaSchemaRegistryClientProxy.PROXY_SECRET, namespace.getMetadata().getCluster())).thenReturn(Single.just(List.of()));
114119

115-
List<Schema> retrievedSchemas = schemaService.findAllForNamespace(namespace);
116-
Assertions.assertTrue(retrievedSchemas.isEmpty());
120+
schemaService.findAllForNamespace(namespace)
121+
.test()
122+
.assertValue(List::isEmpty);
117123
}
118124

119125
/**

cli/src/main/java/com/michelin/ns4kafka/cli/ApiResourcesSubcommand.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,45 @@
99

1010
@CommandLine.Command(name = "api-resources", description = "Print the supported API resources on the server")
1111
public class ApiResourcesSubcommand implements Callable<Integer> {
12+
/**
13+
* API resources service
14+
*/
1215
@Inject
1316
public ApiResourcesService apiResourcesService;
17+
18+
/**
19+
* Login service
20+
*/
1421
@Inject
1522
public LoginService loginService;
1623

24+
/**
25+
* Current command
26+
*/
1727
@CommandLine.Spec
1828
CommandLine.Model.CommandSpec commandSpec;
1929

30+
/**
31+
* Run the "api-resources" command
32+
* @return The command return code
33+
*/
2034
@Override
2135
public Integer call() {
22-
// 1. Authent
2336
boolean authenticated = loginService.doAuthenticate();
2437
if (!authenticated) {
2538
throw new CommandLine.ParameterException(commandSpec.commandLine(), "Login failed");
2639
}
2740

2841
CommandLine.Help.TextTable tt = CommandLine.Help.TextTable.forColumns(
2942
CommandLine.Help.defaultColorScheme(CommandLine.Help.Ansi.AUTO),
30-
new CommandLine.Help.Column[]
31-
{
32-
new CommandLine.Help.Column(30, 2, CommandLine.Help.Column.Overflow.SPAN),
33-
new CommandLine.Help.Column(30, 2, CommandLine.Help.Column.Overflow.SPAN),
34-
new CommandLine.Help.Column(30, 2, CommandLine.Help.Column.Overflow.SPAN)
35-
});
43+
new CommandLine.Help.Column(30, 2, CommandLine.Help.Column.Overflow.SPAN),
44+
new CommandLine.Help.Column(30, 2, CommandLine.Help.Column.Overflow.SPAN),
45+
new CommandLine.Help.Column(30, 2, CommandLine.Help.Column.Overflow.SPAN));
3646
tt.addRowValues("KIND", "NAMES", "NAMESPACED");
47+
3748
apiResourcesService.getListResourceDefinition().forEach(rd ->
38-
tt.addRowValues(rd.getKind(), String.join(",", rd.getNames()), String.valueOf(rd.isNamespaced()))
39-
);
49+
tt.addRowValues(rd.getKind(), String.join(",", rd.getNames()), String.valueOf(rd.isNamespaced())));
50+
4051
System.out.println(tt);
4152
return 0;
4253
}

cli/src/main/java/com/michelin/ns4kafka/cli/ApplySubcommand.java

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,71 @@
2323

2424
@Command(name = "apply", description = "Create or update a resource")
2525
public class ApplySubcommand implements Callable<Integer> {
26-
26+
/**
27+
* Login service
28+
*/
2729
@Inject
2830
public LoginService loginService;
31+
32+
/**
33+
* API resources service
34+
*/
2935
@Inject
3036
public ApiResourcesService apiResourcesService;
37+
38+
/**
39+
* File service
40+
*/
3141
@Inject
3242
public FileService fileService;
43+
44+
/**
45+
* Resource service
46+
*/
3347
@Inject
3448
public ResourceService resourceService;
3549

50+
/**
51+
* Kafkactl configuration
52+
*/
3653
@Inject
3754
public KafkactlConfig kafkactlConfig;
3855

56+
/**
57+
* Kafkactl command
58+
*/
3959
@CommandLine.ParentCommand
4060
public KafkactlCommand kafkactlCommand;
61+
62+
/**
63+
* YAML file or directory containing YAML resources to apply
64+
*/
4165
@Option(names = {"-f", "--file"}, description = "YAML File or Directory containing YAML resources")
4266
public Optional<File> file;
67+
68+
/**
69+
* Enable recursive search of file
70+
*/
4371
@Option(names = {"-R", "--recursive"}, description = "Enable recursive search of file")
4472
public boolean recursive;
73+
74+
/**
75+
* Does not persist resources. Validate only
76+
*/
4577
@Option(names = {"--dry-run"}, description = "Does not persist resources. Validate only")
4678
public boolean dryRun;
4779

80+
/**
81+
* Current command
82+
*/
4883
@CommandLine.Spec
4984
public CommandLine.Model.CommandSpec commandSpec;
5085

86+
/**
87+
* Run the "apply" command
88+
* @return The command return code
89+
* @throws Exception Any exception during the run
90+
*/
5191
@Override
5292
public Integer call() throws Exception {
5393
if (dryRun) {
@@ -59,37 +99,35 @@ public Integer call() throws Exception {
5999
throw new CommandLine.ParameterException(commandSpec.commandLine(), "Login failed");
60100
}
61101

62-
// 0. Check STDIN and -f
63-
boolean hasStdin = System.in.available() > 0;
64102
// If we have none or both stdin and File set, we stop
103+
boolean hasStdin = System.in.available() > 0;
65104
if (hasStdin == file.isPresent()) {
66105
throw new CommandLine.ParameterException(commandSpec.commandLine(), "Required one of -f or stdin");
67106
}
68107

69108
List<Resource> resources;
70-
71109
if (file.isPresent()) {
72-
// 1. list all files to process
110+
// List all files to process
73111
List<File> yamlFiles = fileService.computeYamlFileList(file.get(), recursive);
74112
if (yamlFiles.isEmpty()) {
75113
throw new CommandLine.ParameterException(commandSpec.commandLine(), "Could not find yaml/yml files in " + file.get().getName());
76114
}
77-
// 2 load each files
115+
// Load each files
78116
resources = fileService.parseResourceListFromFiles(yamlFiles);
79117
} else {
80118
Scanner scanner = new Scanner(System.in);
81119
scanner.useDelimiter("\\Z");
82-
// 2 load STDIN
83120
resources = fileService.parseResourceListFromString(scanner.next());
84121
}
85122

86-
// 3. validate resource types from resources
123+
// Validate resource types from resources
87124
List<Resource> invalidResources = apiResourcesService.validateResourceTypes(resources);
88125
if (!invalidResources.isEmpty()) {
89-
String invalid = String.join(", ", invalidResources.stream().map(Resource::getKind).distinct().collect(Collectors.toList()));
126+
String invalid = invalidResources.stream().map(Resource::getKind).distinct().collect(Collectors.joining(", "));
90127
throw new CommandLine.ParameterException(commandSpec.commandLine(), "The server doesn't have resource type [" + invalid + "]");
91128
}
92-
// 4. validate namespace mismatch
129+
130+
// Validate namespace mismatch
93131
String namespace = kafkactlCommand.optionalNamespace.orElse(kafkactlConfig.getCurrentNamespace());
94132
List<Resource> nsMismatch = resources.stream()
95133
.filter(resource -> resource.getMetadata().getNamespace() != null && !resource.getMetadata().getNamespace().equals(namespace))
@@ -98,9 +136,10 @@ public Integer call() throws Exception {
98136
String invalid = String.join(", ", nsMismatch.stream().map(resource -> resource.getKind() + "/" + resource.getMetadata().getName()).distinct().collect(Collectors.toList()));
99137
throw new CommandLine.ParameterException(commandSpec.commandLine(), "Namespace mismatch between kafkactl and yaml document [" + invalid + "]");
100138
}
139+
101140
List<ApiResource> apiResources = apiResourcesService.getListResourceDefinition();
102141

103-
// 5. load schema content
142+
// Load schema content
104143
resources.stream()
105144
.filter(resource -> resource.getKind().equals("Schema") && resource.getSpec().get("schemaFile") != null && StringUtils.isNotEmpty(resource.getSpec().get("schemaFile").toString()))
106145
.forEach(resource -> {
@@ -112,22 +151,25 @@ public Integer call() throws Exception {
112151
}
113152
});
114153

115-
// 6. process each document individually, return 0 when all succeed
154+
// Process each document individually, return 0 when all succeed
116155
int errors = resources.stream()
117156
.map(resource -> {
118157
ApiResource apiResource = apiResources.stream()
119158
.filter(apiRes -> apiRes.getKind().equals(resource.getKind()))
120159
.findFirst()
121-
.orElseThrow(); // already validated
160+
.orElseThrow();
161+
122162
HttpResponse<Resource> response = resourceService.apply(apiResource, namespace, resource, dryRun);
123163
if (response == null) {
124164
return null;
125165
}
166+
126167
Resource merged = response.body();
127168
String resourceState = "";
128169
if (response.header("X-Ns4kafka-Result") != null) {
129170
resourceState = " (" +response.header("X-Ns4kafka-Result") + ")";
130171
}
172+
131173
System.out.println(CommandLine.Help.Ansi.AUTO.string("@|bold,green Success |@") + merged.getKind() + "/" + merged.getMetadata().getName() + resourceState);
132174

133175
return merged;

0 commit comments

Comments
 (0)