Skip to content

Commit 76e7322

Browse files
author
Loïc GREFFIER
authored
Optimized schema apply (#201)
* Optimized schema apply * Optimized schema apply * Update unit tests
1 parent cd2f8b5 commit 76e7322

File tree

2 files changed

+19
-35
lines changed

2 files changed

+19
-35
lines changed

api/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -100,31 +100,22 @@ public Single<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema
100100
.defaultIfEmpty(Optional.empty())
101101
.flatMapSingle(latestSubjectOptional -> schemaService
102102
.register(ns, schema)
103-
.flatMap(id -> schemaService
104-
.getLatestSubject(ns, schema.getMetadata().getName())
105-
.map(Optional::of)
106-
.defaultIfEmpty(Optional.empty())
107-
.flatMapSingle(registeredSchemaOptional -> {
108-
if (registeredSchemaOptional.isEmpty()) {
109-
return Single.error(new Exception(String.format("Cannot register schema %s", schema.getMetadata().getName())));
110-
}
111-
112-
Schema registeredSchema = registeredSchemaOptional.get();
113-
ApplyStatus status;
114-
115-
if (latestSubjectOptional.isEmpty()) {
116-
status = ApplyStatus.created;
117-
sendEventLog(schema.getKind(), registeredSchema.getMetadata(), status, null, registeredSchema.getSpec());
118-
} else if (registeredSchema.getSpec().getVersion() > latestSubjectOptional.get().getSpec().getVersion()) {
119-
status = ApplyStatus.changed;
120-
sendEventLog(schema.getKind(), registeredSchema.getMetadata(), status,
121-
latestSubjectOptional.get().getSpec(), registeredSchema.getSpec());
122-
} else {
123-
status = ApplyStatus.unchanged;
124-
}
125-
126-
return Single.just(formatHttpResponse(schema, status));
127-
})));
103+
.map(id -> {
104+
ApplyStatus status;
105+
106+
if (latestSubjectOptional.isEmpty()) {
107+
status = ApplyStatus.created;
108+
sendEventLog(schema.getKind(), schema.getMetadata(), status, null, schema.getSpec());
109+
} else if (!id.equals(latestSubjectOptional.get().getSpec().getId())) {
110+
status = ApplyStatus.changed;
111+
sendEventLog(schema.getKind(), schema.getMetadata(), status, latestSubjectOptional.get().getSpec(),
112+
schema.getSpec());
113+
} else {
114+
status = ApplyStatus.unchanged;
115+
}
116+
117+
return formatHttpResponse(schema, status);
118+
}));
128119
});
129120
}
130121

api/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@ void applyCreated() {
6969
when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
7070
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true);
7171
when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Single.just(List.of()));
72-
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName()))
73-
.thenReturn(Maybe.empty())
74-
.thenReturn(Maybe.just(schema));
72+
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Maybe.empty());
7573
when(schemaService.register(namespace, schema)).thenReturn(Single.just(1));
7674
when(securityService.username()).thenReturn(Optional.of("test-user"));
7775
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
@@ -92,15 +90,12 @@ void applyCreated() {
9290
void applyChanged() {
9391
Namespace namespace = buildNamespace();
9492
Schema schema = buildSchema();
95-
Schema schemaV2 = buildSchema();
96-
schemaV2.getSpec().setVersion(2);
9793

9894
when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
9995
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true);
10096
when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Single.just(List.of()));
10197
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName()))
102-
.thenReturn(Maybe.just(schema))
103-
.thenReturn(Maybe.just(schemaV2));
98+
.thenReturn(Maybe.just(schema));
10499
when(schemaService.register(namespace, schema)).thenReturn(Single.just(2));
105100
when(securityService.username()).thenReturn(Optional.of("test-user"));
106101
when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false);
@@ -125,9 +120,7 @@ void applyUnchanged() {
125120
when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
126121
when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true);
127122
when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Single.just(List.of()));
128-
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName()))
129-
.thenReturn(Maybe.just(schema))
130-
.thenReturn(Maybe.just(schema));
123+
when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Maybe.just(schema));
131124
when(schemaService.register(namespace, schema)).thenReturn(Single.just(1));
132125

133126
schemaController.apply("myNamespace", schema, false)

0 commit comments

Comments
 (0)