Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ private Mono<List<String>> validateReferences(Namespace ns, Schema schema) {
*/
public Mono<Integer> register(Namespace namespace, Schema schema) {
return schemaRegistryClient
.register(namespace.getMetadata().getCluster(),
schema.getMetadata().getName(), SchemaRequest.builder()
.register(
namespace.getMetadata().getCluster(),
schema.getMetadata().getName(),
SchemaRequest.builder()
.schemaType(String.valueOf(schema.getSpec().getSchemaType()))
.schema(schema.getSpec().getSchema())
.references(schema.getSpec().getReferences())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -71,9 +73,14 @@ public Flux<String> getSubjects(String kafkaCluster) {
*/
public Mono<SchemaResponse> getSubject(String kafkaCluster, String subject, String version) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);
String encodedVersion = URLEncoder.encode(version, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version)))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + VERSIONS + encodedVersion)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaResponse.class))
.onErrorResume(HttpClientResponseException.class,
ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex));
Expand All @@ -88,15 +95,18 @@ public Mono<SchemaResponse> getSubject(String kafkaCluster, String subject, Stri
*/
public Flux<SchemaResponse> getAllSubjectVersions(String kafkaCluster, String subject) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions")))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + "/versions")))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Flux.from(httpClient.retrieve(request, Integer[].class))
.flatMap(ids -> Flux.fromIterable(Arrays.asList(ids))
.flatMap(id -> {
HttpRequest<?> requestVersion = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + id)))
HttpRequest<?> requestVersion = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + VERSIONS + id)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return httpClient.retrieve(requestVersion, SchemaResponse.class);
Expand All @@ -115,10 +125,13 @@ public Flux<SchemaResponse> getAllSubjectVersions(String kafkaCluster, String su
*/
public Mono<SchemaResponse> register(String kafkaCluster, String subject, SchemaRequest body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request =
HttpRequest.POST(URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions")),
body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + "/versions")), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaResponse.class));
}

Expand All @@ -132,9 +145,13 @@ public Mono<SchemaResponse> register(String kafkaCluster, String subject, Schema
*/
public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolean hardDelete) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

MutableHttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "?permanent=" + hardDelete)))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + "?permanent=" + hardDelete)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Integer[].class));
}

Expand All @@ -147,13 +164,16 @@ public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolea
* @param hardDelete Should the subject be hard deleted or not
* @return The version of the deleted subject
*/
public Mono<Integer> deleteSubjectVersion(String kafkaCluster, String subject, String version,
boolean hardDelete) {
public Mono<Integer> deleteSubjectVersion(String kafkaCluster, String subject, String version, boolean hardDelete) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);
String encodedVersion = URLEncoder.encode(version, StandardCharsets.UTF_8);

MutableHttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + VERSIONS + version
+ "?permanent=" + hardDelete)))
URI.create(StringUtils.prependUri(config.getUrl(),
SUBJECTS + encodedSubject + VERSIONS + encodedVersion + "?permanent=" + hardDelete)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Integer.class));
}

Expand All @@ -165,14 +185,17 @@ public Mono<Integer> deleteSubjectVersion(String kafkaCluster, String subject, S
* @param body The request
* @return The schema compatibility validation
*/
public Mono<SchemaCompatibilityCheckResponse> validateSchemaCompatibility(String kafkaCluster, String subject,
public Mono<SchemaCompatibilityCheckResponse> validateSchemaCompatibility(String kafkaCluster,
String subject,
SchemaRequest body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(
StringUtils.prependUri(config.getUrl(), "/compatibility/subjects/" + subject
+ "/versions?verbose=true")),
body)
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
"/compatibility/subjects/" + encodedSubject + "/versions?verbose=true")), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityCheckResponse.class))
.onErrorResume(HttpClientResponseException.class,
ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex));
Expand All @@ -186,12 +209,17 @@ public Mono<SchemaCompatibilityCheckResponse> validateSchemaCompatibility(String
* @param body The schema compatibility request
* @return The schema compatibility update
*/
public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(String kafkaCluster, String subject,
public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(String kafkaCluster,
String subject,
SchemaCompatibilityRequest body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request =
HttpRequest.PUT(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject)), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
CONFIG + encodedSubject)), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class));
}

Expand All @@ -204,8 +232,13 @@ public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(String kafka
*/
public Mono<SchemaCompatibilityResponse> getCurrentCompatibilityBySubject(String kafkaCluster, String subject) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.GET(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject)))
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
CONFIG + encodedSubject)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class))
.onErrorResume(HttpClientResponseException.class,
ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex));
Expand All @@ -220,9 +253,13 @@ public Mono<SchemaCompatibilityResponse> getCurrentCompatibilityBySubject(String
*/
public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(String kafkaCluster, String subject) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
MutableHttpRequest<?> request =
HttpRequest.DELETE(URI.create(StringUtils.prependUri(config.getUrl(), CONFIG + subject)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());
String encodedSubject = URLEncoder.encode(subject, StandardCharsets.UTF_8);

MutableHttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(),
CONFIG + encodedSubject)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, SchemaCompatibilityResponse.class));
}

Expand All @@ -235,11 +272,12 @@ public Mono<SchemaCompatibilityResponse> deleteCurrentCompatibilityBySubject(Str
*/
public Mono<List<TagTopicInfo>> associateTags(String kafkaCluster, List<TagTopicInfo> tagSpecs) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.POST(URI.create(StringUtils.prependUri(
config.getUrl(),

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/entity/tags")), tagSpecs)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Argument.listOf(TagTopicInfo.class)));
}

Expand All @@ -252,9 +290,12 @@ public Mono<List<TagTopicInfo>> associateTags(String kafkaCluster, List<TagTopic
*/
public Mono<List<TagInfo>> createTags(String kafkaCluster, List<TagInfo> tags) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest.POST(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/types/tagdefs")), tags)

HttpRequest<?> request = HttpRequest.POST(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/types/tagdefs")), tags)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, Argument.listOf(TagInfo.class)));
}

Expand All @@ -268,11 +309,12 @@ public Mono<List<TagInfo>> createTags(String kafkaCluster, List<TagInfo> tags) {
*/
public Mono<HttpResponse<Void>> dissociateTag(String kafkaCluster, String entityName, String tagName) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.DELETE(URI.create(StringUtils.prependUri(
config.getUrl(),

HttpRequest<?> request = HttpRequest.DELETE(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/entity/type/kafka_topic/name/" + entityName + "/tags/" + tagName)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.exchange(request, Void.class));
}

Expand All @@ -284,11 +326,12 @@ public Mono<HttpResponse<Void>> dissociateTag(String kafkaCluster, String entity
*/
public Mono<TopicListResponse> getTopicWithCatalogInfo(String kafkaCluster, int limit, int offset) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.GET(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/search/basic?type=kafka_topic&limit="
+ limit + "&offset=" + offset)))

HttpRequest<?> request = HttpRequest.GET(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/search/basic?type=kafka_topic&limit=" + limit + "&offset=" + offset)))
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.retrieve(request, TopicListResponse.class));
}

Expand All @@ -302,10 +345,12 @@ public Mono<TopicListResponse> getTopicWithCatalogInfo(String kafkaCluster, int
public Mono<HttpResponse<TopicDescriptionUpdateResponse>> updateDescription(String kafkaCluster,
TopicDescriptionUpdateBody body) {
ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster);
HttpRequest<?> request = HttpRequest
.PUT(URI.create(StringUtils.prependUri(
config.getUrl(), "/catalog/v1/entity")), body)

HttpRequest<?> request = HttpRequest.PUT(
URI.create(StringUtils.prependUri(config.getUrl(),
"/catalog/v1/entity")), body)
.basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword());

return Mono.from(httpClient.exchange(request, TopicDescriptionUpdateResponse.class));
}

Expand Down