Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.michelin.ns4kafka.model.Namespace;
import com.michelin.ns4kafka.model.schema.Schema;
import com.michelin.ns4kafka.model.schema.SchemaCompatibilityState;
import com.michelin.ns4kafka.model.schema.SchemaList;
import com.michelin.ns4kafka.service.SchemaService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
Expand Down Expand Up @@ -52,8 +51,16 @@ public class SchemaController extends NamespacedResourceController {
* @return A list of schemas
*/
@Get
public Flux<SchemaList> list(String namespace, @QueryValue(defaultValue = "*") String name) {
return schemaService.findByWildcardName(getNamespace(namespace), name);
public Flux<Schema> list(String namespace, @QueryValue(defaultValue = "*") String name) {
Namespace ns = getNamespace(namespace);
return schemaService.findByWildcardName(ns, name)
.collectList()
.flatMapMany(schemas -> schemas.size() == 1
? Flux.fromIterable(schemas
.stream()
.map(schema -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName()))
.toList()).flatMap(schema -> schema)
: Flux.fromIterable(schemas));
}

/**
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/michelin/ns4kafka/model/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.michelin.ns4kafka.model.MetadataResource;
import io.micronaut.core.annotation.Introspected;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -24,7 +23,6 @@
@EqualsAndHashCode(callSuper = true)
public class Schema extends MetadataResource {
@Valid
@NotNull
private SchemaSpec spec;

/**
Expand Down
28 changes: 0 additions & 28 deletions src/main/java/com/michelin/ns4kafka/model/schema/SchemaList.java

This file was deleted.

72 changes: 41 additions & 31 deletions src/main/java/com/michelin/ns4kafka/service/SchemaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.michelin.ns4kafka.model.Metadata;
import com.michelin.ns4kafka.model.Namespace;
import com.michelin.ns4kafka.model.schema.Schema;
import com.michelin.ns4kafka.model.schema.SchemaList;
import com.michelin.ns4kafka.service.client.schema.SchemaRegistryClient;
import com.michelin.ns4kafka.service.client.schema.entities.SchemaCompatibilityRequest;
import com.michelin.ns4kafka.service.client.schema.entities.SchemaCompatibilityResponse;
Expand Down Expand Up @@ -48,7 +47,7 @@ public class SchemaService {
* @param namespace The namespace
* @return A list of schemas
*/
public Flux<SchemaList> findAllForNamespace(Namespace namespace) {
public Flux<Schema> findAllForNamespace(Namespace namespace) {
List<AccessControlEntry> acls = aclService
.findResourceOwnerGrantedToNamespace(namespace, AccessControlEntry.ResourceType.TOPIC);
return schemaRegistryClient
Expand All @@ -57,7 +56,7 @@ public Flux<SchemaList> findAllForNamespace(Namespace namespace) {
String underlyingTopicName = subject.replaceAll("-(key|value)$", "");
return aclService.isResourceCoveredByAcls(acls, underlyingTopicName);
})
.map(subject -> SchemaList.builder()
.map(subject -> Schema.builder()
.metadata(Metadata.builder()
.cluster(namespace.getMetadata().getCluster())
.namespace(namespace.getMetadata().getName())
Expand All @@ -73,11 +72,11 @@ public Flux<SchemaList> findAllForNamespace(Namespace namespace) {
* @param name The name filter
* @return A list of schemas
*/
public Flux<SchemaList> findByWildcardName(Namespace namespace, String name) {
public Flux<Schema> findByWildcardName(Namespace namespace, String name) {
List<String> nameFilterPatterns = RegexUtils.convertWildcardStringsToRegex(List.of(name));
return findAllForNamespace(namespace)
.filter(schemaList -> RegexUtils
.isResourceCoveredByRegex(schemaList.getMetadata().getName(), nameFilterPatterns));
.filter(schema -> RegexUtils
.isResourceCoveredByRegex(schema.getMetadata().getName(), nameFilterPatterns));
}

/**
Expand Down Expand Up @@ -107,41 +106,52 @@ public Flux<Schema> getAllSubjectVersions(Namespace namespace, String subject) {
);
}

/**
* Build the schema spec from the SchemaResponse.
*
* @param namespace The namespace
* @param subjectOptional The subject object from Http response
* @return A Subject
*/
public Mono<Schema> buildSchemaSpec(Namespace namespace, SchemaResponse subjectOptional) {
return schemaRegistryClient
.getCurrentCompatibilityBySubject(namespace.getMetadata().getCluster(), subjectOptional.subject())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.map(currentCompatibilityOptional -> {
Schema.Compatibility compatibility = currentCompatibilityOptional.isPresent()
? currentCompatibilityOptional.get().compatibilityLevel() : Schema.Compatibility.GLOBAL;

return Schema.builder()
.metadata(Metadata.builder()
.cluster(namespace.getMetadata().getCluster())
.namespace(namespace.getMetadata().getName())
.name(subjectOptional.subject())
.build())
.spec(Schema.SchemaSpec.builder()
.id(subjectOptional.id())
.version(subjectOptional.version())
.compatibility(compatibility)
.schema(subjectOptional.schema())
.schemaType(subjectOptional.schemaType() == null ? Schema.SchemaType.AVRO :
Schema.SchemaType.valueOf(subjectOptional.schemaType()))
.build())
.build();
});
}

/**
* Get a subject by its name and version.
*
* @param namespace The namespace
* @param subject The subject
* @param version The version
* @return A Subject
* @return A subject
*/
public Mono<Schema> getSubjectByVersion(Namespace namespace, String subject, String version) {
return schemaRegistryClient
.getSubject(namespace.getMetadata().getCluster(), subject, version)
.flatMap(latestSubjectOptional -> schemaRegistryClient
.getCurrentCompatibilityBySubject(namespace.getMetadata().getCluster(), subject)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.map(currentCompatibilityOptional -> {
Schema.Compatibility compatibility = currentCompatibilityOptional.isPresent()
? currentCompatibilityOptional.get().compatibilityLevel() : Schema.Compatibility.GLOBAL;

return Schema.builder()
.metadata(Metadata.builder()
.cluster(namespace.getMetadata().getCluster())
.namespace(namespace.getMetadata().getName())
.name(latestSubjectOptional.subject())
.build())
.spec(Schema.SchemaSpec.builder()
.id(latestSubjectOptional.id())
.version(latestSubjectOptional.version())
.compatibility(compatibility)
.schema(latestSubjectOptional.schema())
.schemaType(latestSubjectOptional.schemaType() == null ? Schema.SchemaType.AVRO :
Schema.SchemaType.valueOf(latestSubjectOptional.schemaType()))
.build())
.build();
}));
.flatMap(subjectOptional -> buildSchemaSpec(namespace, subjectOptional));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public enum Kind {
ROLE_BINDING("RoleBinding"),
SCHEMA("Schema"),
SCHEMA_COMPATIBILITY_STATE("SchemaCompatibilityState"),
SCHEMA_LIST("SchemaList"),
STATUS("Status"),
TOPIC("Topic"),
VAULT_RESPONSE("VaultResponse");
Expand Down
Loading