From 83e59acab156ea60715c3f999c31f17cefd5e9a9 Mon Sep 17 00:00:00 2001 From: German Osin Date: Tue, 27 May 2025 00:23:18 +0200 Subject: [PATCH 1/5] Housekeeping. Removed Superbuilders. Fixed deprecations --- api/src/main/{antlr4 => antlr}/ksql/KsqlGrammar.g4 | 0 .../main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java | 4 ++++ 2 files changed, 4 insertions(+) rename api/src/main/{antlr4 => antlr}/ksql/KsqlGrammar.g4 (100%) create mode 100644 api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java diff --git a/api/src/main/antlr4/ksql/KsqlGrammar.g4 b/api/src/main/antlr/ksql/KsqlGrammar.g4 similarity index 100% rename from api/src/main/antlr4/ksql/KsqlGrammar.g4 rename to api/src/main/antlr/ksql/KsqlGrammar.g4 diff --git a/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java b/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java new file mode 100644 index 000000000..6f15511ef --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java @@ -0,0 +1,4 @@ +package io.kafbat.ui.mapper; + +public interface DynamicConfigMapper { +} From e8dbcb2c57a96694a3c4099d55e7c3926e454777 Mon Sep 17 00:00:00 2001 From: German Osin Date: Tue, 27 May 2025 00:24:03 +0200 Subject: [PATCH 2/5] Housekeeping. Removed Superbuilders. Fixed deprecations --- api/build.gradle | 9 ++- .../java/io/kafbat/ui/config/McpConfig.java | 1 - .../ApplicationConfigController.java | 29 ++------- .../controller/AuthorizationController.java | 8 +-- .../io/kafbat/ui/mapper/ClusterMapper.java | 4 ++ .../ui/mapper/DescribeLogDirsMapper.java | 21 ++++--- .../kafbat/ui/mapper/DynamicConfigMapper.java | 63 +++++++++++++++++++ .../kafbat/ui/mapper/KafkaConnectMapper.java | 4 ++ .../kafbat/ui/model/InternalLogDirStats.java | 7 ++- .../kafbat/ui/serdes/CustomSerdeLoader.java | 9 ++- .../ui/service/AdminClientServiceImpl.java | 1 - .../io/kafbat/ui/service/BrokerService.java | 3 +- .../ui/service/ReactiveAdminClient.java | 5 +- .../java/io/kafbat/ui/util/ContentUtils.java | 2 - .../ui/util/DynamicConfigOperations.java | 8 ++- .../util/jsonschema/JsonAvroConversion.java | 3 +- .../io/kafbat/ui/AbstractIntegrationTest.java | 4 +- ...exBasedProviderAuthorityExtractorTest.java | 2 - .../ui/container/KafkaConnectContainer.java | 6 +- .../kafbat/ui/container/KsqlDbContainer.java | 6 +- .../ui/container/SchemaRegistryContainer.java | 3 +- .../kafbat/ui/emitter/MessageFiltersTest.java | 5 +- .../kafbat/ui/producer/KafkaTestProducer.java | 4 +- .../AccessControlServiceRbacDisabledTest.java | 5 +- .../AccessControlServiceRbacEnabledTest.java | 10 ++- contract/build.gradle | 6 +- .../main/resources/swagger/kafbat-ui-api.yaml | 49 ++++++++++++++- e2e-tests/build.gradle | 4 +- frontend/build.gradle | 15 ++--- .../ui/serde/api/DeserializeResult.java | 18 +++++- .../kafbat/ui/serde/api/PropertyResolver.java | 5 +- .../io/kafbat/ui/serde/api/RecordHeader.java | 11 ++++ .../io/kafbat/ui/serde/api/RecordHeaders.java | 4 +- .../ui/serde/api/SchemaDescription.java | 4 +- .../java/io/kafbat/ui/serde/api/Serde.java | 33 +++++++++- 35 files changed, 265 insertions(+), 106 deletions(-) diff --git a/api/build.gradle b/api/build.gradle index c8ce5d54c..70af06ed5 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -106,11 +106,14 @@ generateGrammarSource { arguments += ["-package", "ksql"] } +tasks.withType(JavaCompile) { + options.compilerArgs << "-Xlint:deprecation" +} + + + sourceSets { main { - antlr { - srcDirs = ["src/main/antlr4"] - } java { srcDirs += generateGrammarSource.outputDirectory } diff --git a/api/src/main/java/io/kafbat/ui/config/McpConfig.java b/api/src/main/java/io/kafbat/ui/config/McpConfig.java index cf4181f73..c2a0ef4eb 100644 --- a/api/src/main/java/io/kafbat/ui/config/McpConfig.java +++ b/api/src/main/java/io/kafbat/ui/config/McpConfig.java @@ -5,7 +5,6 @@ import io.kafbat.ui.service.mcp.McpTool; import io.modelcontextprotocol.server.McpAsyncServer; import io.modelcontextprotocol.server.McpServer; -import io.modelcontextprotocol.server.McpServerFeatures.AsyncPromptSpecification; import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification; import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider; import io.modelcontextprotocol.spec.McpSchema; diff --git a/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java b/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java index e8d763545..32c7d0432 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java @@ -5,10 +5,9 @@ import io.kafbat.ui.api.ApplicationConfigApi; import io.kafbat.ui.config.ClustersProperties; -import io.kafbat.ui.model.ActionDTO; +import io.kafbat.ui.mapper.DynamicConfigMapper; import io.kafbat.ui.model.AppAuthenticationSettingsDTO; import io.kafbat.ui.model.ApplicationConfigDTO; -import io.kafbat.ui.model.ApplicationConfigPropertiesDTO; import io.kafbat.ui.model.ApplicationConfigValidationDTO; import io.kafbat.ui.model.ApplicationInfoDTO; import io.kafbat.ui.model.ClusterConfigValidationDTO; @@ -20,12 +19,9 @@ import io.kafbat.ui.util.ApplicationRestarter; import io.kafbat.ui.util.DynamicConfigOperations; import java.util.Map; -import java.util.Optional; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.mapstruct.Mapper; -import org.mapstruct.factory.Mappers; import org.springframework.http.ResponseEntity; import org.springframework.http.codec.multipart.FilePart; import org.springframework.http.codec.multipart.Part; @@ -41,26 +37,11 @@ @RequiredArgsConstructor public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi { - private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class); - - @Mapper - interface PropertiesMapper { - - DynamicConfigOperations.PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto); - - ApplicationConfigPropertiesDTO toDto(DynamicConfigOperations.PropertiesStructure propertiesStructure); - - default ActionDTO stringToActionDto(String str) { - return Optional.ofNullable(str) - .map(s -> Enum.valueOf(ActionDTO.class, s.toUpperCase())) - .orElseThrow(); - } - } - private final DynamicConfigOperations dynamicConfigOperations; private final ApplicationRestarter restarter; private final KafkaClusterFactory kafkaClusterFactory; private final ApplicationInfoService applicationInfoService; + private final DynamicConfigMapper configMapper; @Override public Mono> getApplicationInfo(ServerWebExchange exchange) { @@ -83,7 +64,7 @@ public Mono> getCurrentConfig(ServerWebExch return validateAccess(context) .then(Mono.fromSupplier(() -> ResponseEntity.ok( new ApplicationConfigDTO() - .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties())) + .properties(configMapper.toDto(dynamicConfigOperations.getCurrentProperties())) ))) .doOnEach(sig -> audit(context, sig)); } @@ -98,7 +79,7 @@ public Mono> restartWithConfig(Mono rest return validateAccess(context) .then(restartRequestDto) .doOnNext(restartDto -> { - var newConfig = MAPPER.fromDto(restartDto.getConfig().getProperties()); + var newConfig = configMapper.fromDto(restartDto.getConfig().getProperties()); dynamicConfigOperations.persist(newConfig); }) .doOnEach(sig -> audit(context, sig)) @@ -132,7 +113,7 @@ public Mono> validateConfig(Mono< return validateAccess(context) .then(configDto) .flatMap(config -> { - DynamicConfigOperations.PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties()); + DynamicConfigOperations.PropertiesStructure newConfig = configMapper.fromDto(config.getProperties()); ClustersProperties clustersProperties = newConfig.getKafka(); return validateClustersConfig(clustersProperties) .map(validations -> new ApplicationConfigValidationDTO().clusters(validations)); diff --git a/api/src/main/java/io/kafbat/ui/controller/AuthorizationController.java b/api/src/main/java/io/kafbat/ui/controller/AuthorizationController.java index 1ac0aeb85..aac1ab6fa 100644 --- a/api/src/main/java/io/kafbat/ui/controller/AuthorizationController.java +++ b/api/src/main/java/io/kafbat/ui/controller/AuthorizationController.java @@ -45,23 +45,22 @@ public Mono> getUserAuthInfo(ServerWebExch .map(SecurityContext::getAuthentication) .map(Principal::getName); - var builder = AuthenticationInfoDTO.builder() + var builder = new AuthenticationInfoDTO() .rbacEnabled(accessControlService.isRbacEnabled()); return userName .zipWith(permissions) .map(data -> (AuthenticationInfoDTO) builder .userInfo(new UserInfoDTO(data.getT1(), data.getT2())) - .build() ) - .switchIfEmpty(Mono.just(builder.build())) + .switchIfEmpty(Mono.just(builder)) .map(ResponseEntity::ok); } private List mapPermissions(List permissions, List clusters) { return permissions .stream() - .map(permission -> (UserPermissionDTO) UserPermissionDTO.builder() + .map(permission -> new UserPermissionDTO() .clusters(clusters) .resource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase())) .value(permission.getValue()) @@ -71,7 +70,6 @@ private List mapPermissions(List permissions, Lis .map(this::mapAction) .filter(Objects::nonNull) .toList()) - .build() ) .toList(); } diff --git a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java index 5dfd7c954..f67b4aad3 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java @@ -49,8 +49,10 @@ @Mapper(componentModel = "spring") public interface ClusterMapper { + @Mapping(target = "defaultCluster", ignore = true) ClusterDTO toCluster(InternalClusterState clusterState); + @Mapping(target = "zooKeeperStatus", ignore = true) ClusterStatsDTO toClusterStats(InternalClusterState clusterState); default ClusterMetricsDTO toClusterMetrics(Metrics metrics) { @@ -95,6 +97,8 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) { BrokerDTO toBrokerDto(InternalBroker broker); + @Mapping(target = "keySerde", ignore = true) + @Mapping(target = "valueSerde", ignore = true) TopicDetailsDTO toTopicDetails(InternalTopic topic); @Mapping(target = "isReadOnly", source = "readOnly") diff --git a/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java b/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java index bccd3a66b..3831169b1 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java @@ -7,6 +7,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.clients.admin.ReplicaInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DescribeLogDirsResponse; @@ -16,7 +18,7 @@ public class DescribeLogDirsMapper { public List toBrokerLogDirsList( - Map> logDirsInfo) { + Map> logDirsInfo) { return logDirsInfo.entrySet().stream().map( mapEntry -> mapEntry.getValue().entrySet().stream() @@ -26,13 +28,13 @@ public List toBrokerLogDirsList( } private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName, - DescribeLogDirsResponse.LogDirInfo logDirInfo) { + LogDirDescription logDirInfo) { BrokersLogdirsDTO result = new BrokersLogdirsDTO(); result.setName(dirName); - if (logDirInfo.error != null && logDirInfo.error != Errors.NONE) { - result.setError(logDirInfo.error.message()); + if (logDirInfo.error() != null) { + result.setError(logDirInfo.error().getMessage()); } - var topics = logDirInfo.replicaInfos.entrySet().stream() + var topics = logDirInfo.replicaInfos().entrySet().stream() .collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream() .map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue())) .toList(); @@ -41,8 +43,7 @@ private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName, } private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name, - List> partitions) { + List> partitions) { BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO(); topic.setName(name); topic.setPartitions( @@ -54,12 +55,12 @@ private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name, } private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition, - DescribeLogDirsResponse.ReplicaInfo replicaInfo) { + ReplicaInfo replicaInfo) { BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO(); logDir.setBroker(broker); logDir.setPartition(partition); - logDir.setSize(replicaInfo.size); - logDir.setOffsetLag(replicaInfo.offsetLag); + logDir.setSize(replicaInfo.size()); + logDir.setOffsetLag(replicaInfo.offsetLag()); return logDir; } } diff --git a/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java b/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java index 6f15511ef..850429295 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java @@ -1,4 +1,67 @@ package io.kafbat.ui.mapper; +import io.kafbat.ui.model.ActionDTO; +import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerDTO; +import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerJwtDTO; +import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerOpaquetokenDTO; +import io.kafbat.ui.model.ApplicationConfigPropertiesDTO; +import io.kafbat.ui.util.DynamicConfigOperations; +import java.util.Optional; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.springframework.boot.autoconfigure.security.oauth2.resource.OAuth2ResourceServerProperties; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; + +@Mapper(componentModel = "spring") public interface DynamicConfigMapper { + + @Mapping(target = "rbac.roles[].permissions[].parsedActions", ignore = true) + DynamicConfigOperations.PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto); + + ApplicationConfigPropertiesDTO toDto(DynamicConfigOperations.PropertiesStructure propertiesStructure); + + default String map(Resource resource) { + return resource.getFilename(); + } + + default OAuth2ResourceServerProperties map(ApplicationConfigPropertiesAuthOauth2ResourceServerDTO value) { + if (value != null) { + OAuth2ResourceServerProperties result = new OAuth2ResourceServerProperties(); + if (value.getJwt() != null) { + OAuth2ResourceServerProperties.Jwt jwt = result.getJwt(); + + ApplicationConfigPropertiesAuthOauth2ResourceServerJwtDTO source = value.getJwt(); + Optional.ofNullable(source.getJwsAlgorithms()).ifPresent(jwt::setJwsAlgorithms); + Optional.ofNullable(source.getJwkSetUri()).ifPresent(jwt::setJwkSetUri); + Optional.ofNullable(source.getIssuerUri()).ifPresent(jwt::setIssuerUri); + Optional.ofNullable(source.getPublicKeyLocation()) + .map(this::mapResource) + .ifPresent(jwt::setPublicKeyLocation); + Optional.ofNullable(source.getAudiences()).ifPresent(jwt::setAudiences); + Optional.ofNullable(source.getAuthoritiesClaimName()).ifPresent(jwt::setAuthoritiesClaimName); + Optional.ofNullable(source.getAuthoritiesClaimDelimiter()).ifPresent(jwt::setAuthoritiesClaimDelimiter); + Optional.ofNullable(source.getAuthorityPrefix()).ifPresent(jwt::setAuthorityPrefix); + Optional.ofNullable(source.getPrincipalClaimName()).ifPresent(jwt::setPrincipalClaimName); + } + if (value.getOpaquetoken() != null) { + OAuth2ResourceServerProperties.Opaquetoken opaquetoken = result.getOpaquetoken(); + ApplicationConfigPropertiesAuthOauth2ResourceServerOpaquetokenDTO source = value.getOpaquetoken(); + Optional.ofNullable(source.getClientId()).ifPresent(opaquetoken::setClientId); + Optional.ofNullable(source.getClientSecret()).ifPresent(opaquetoken::setClientSecret); + Optional.ofNullable(source.getIntrospectionUri()).ifPresent(opaquetoken::setIntrospectionUri); + } + } + return null; + } + + default Resource mapResource(String filename) { + return new FileSystemResource(filename); + } + + default ActionDTO stringToActionDto(String str) { + return Optional.ofNullable(str) + .map(s -> Enum.valueOf(ActionDTO.class, s.toUpperCase())) + .orElseThrow(); + } } diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 80cfeaaee..2bef74b51 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -14,15 +14,19 @@ import io.kafbat.ui.model.connect.InternalConnectInfo; import java.util.List; import org.mapstruct.Mapper; +import org.mapstruct.Mapping; @Mapper(componentModel = "spring") public interface KafkaConnectMapper { NewConnector toClient(io.kafbat.ui.model.NewConnectorDTO newConnector); + @Mapping(target = "status", ignore = true) + @Mapping(target = "connect", ignore = true) ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector); ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus); + @Mapping(target = "status", ignore = true) TaskDTO fromClient(ConnectorTask connectorTask); TaskStatusDTO fromClient(io.kafbat.ui.connect.model.TaskStatus taskStatus); diff --git a/api/src/main/java/io/kafbat/ui/model/InternalLogDirStats.java b/api/src/main/java/io/kafbat/ui/model/InternalLogDirStats.java index 64fc56c06..09cc56dcf 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalLogDirStats.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalLogDirStats.java @@ -8,6 +8,7 @@ import java.util.LongSummaryStatistics; import java.util.Map; import lombok.Value; +import org.apache.kafka.clients.admin.LogDirDescription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.util.function.Tuple2; @@ -36,12 +37,12 @@ public static InternalLogDirStats empty() { return new InternalLogDirStats(Map.of()); } - public InternalLogDirStats(Map> log) { + public InternalLogDirStats(Map> log) { final List> topicPartitions = log.entrySet().stream().flatMap(b -> b.getValue().entrySet().stream().flatMap(topicMap -> - topicMap.getValue().replicaInfos.entrySet().stream() - .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) + topicMap.getValue().replicaInfos().entrySet().stream() + .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size())) ) ).toList(); diff --git a/api/src/main/java/io/kafbat/ui/serdes/CustomSerdeLoader.java b/api/src/main/java/io/kafbat/ui/serdes/CustomSerdeLoader.java index 87f453ca8..3b44d0517 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/CustomSerdeLoader.java +++ b/api/src/main/java/io/kafbat/ui/serdes/CustomSerdeLoader.java @@ -87,11 +87,10 @@ private ClassLoader createClassloader(Path location) { // we assume that location's content does not change during serdes creation // so, we can reuse already created classloaders return classloaders.computeIfAbsent(location, l -> - AccessController.doPrivileged( - (PrivilegedAction) () -> - new ChildFirstClassloader( - archives.toArray(URL[]::new), - CustomSerdeLoader.class.getClassLoader()))); + new ChildFirstClassloader( + archives.toArray(URL[]::new), + CustomSerdeLoader.class.getClassLoader()) + ); } //--------------------------------------------------------------------------------- diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index fd9735b87..1e7ee53fb 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -8,7 +8,6 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; diff --git a/api/src/main/java/io/kafbat/ui/service/BrokerService.java b/api/src/main/java/io/kafbat/ui/service/BrokerService.java index 198685b93..cc1b1a1a9 100644 --- a/api/src/main/java/io/kafbat/ui/service/BrokerService.java +++ b/api/src/main/java/io/kafbat/ui/service/BrokerService.java @@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.LogDirDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.errors.InvalidRequestException; @@ -109,7 +110,7 @@ public Mono updateBrokerConfigByName(KafkaCluster cluster, .doOnError(e -> log.error("Unexpected error", e)); } - private Mono>> getClusterLogDirs( + private Mono>> getClusterLogDirs( KafkaCluster cluster, List reqBrokers) { return adminClientService.get(cluster) .flatMap(admin -> { diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 34ca4f688..8bfa8f079 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -55,6 +55,7 @@ import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.LogDirDescription; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; @@ -403,9 +404,9 @@ static Mono> toMonoWithExceptionFilter(Map> v ); } - public Mono>> describeLogDirs( + public Mono>> describeLogDirs( Collection brokerIds) { - return toMono(client.describeLogDirs(brokerIds).all()) + return toMono(client.describeLogDirs(brokerIds).allDescriptions()) .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of())) .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of())) .onErrorResume(th -> true, th -> { diff --git a/api/src/main/java/io/kafbat/ui/util/ContentUtils.java b/api/src/main/java/io/kafbat/ui/util/ContentUtils.java index e5d6cce00..22b4496e6 100644 --- a/api/src/main/java/io/kafbat/ui/util/ContentUtils.java +++ b/api/src/main/java/io/kafbat/ui/util/ContentUtils.java @@ -4,8 +4,6 @@ import java.nio.CharBuffer; import java.nio.charset.CharsetDecoder; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.regex.Pattern; /** * Provides utility methods converting byte data to string representations. diff --git a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java index 6c3a99b04..c5dcdd766 100644 --- a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java +++ b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java @@ -30,7 +30,9 @@ import org.springframework.http.codec.multipart.FilePart; import org.springframework.stereotype.Component; import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.Constructor; import org.yaml.snakeyaml.introspector.BeanAccess; import org.yaml.snakeyaml.introspector.Property; import org.yaml.snakeyaml.introspector.PropertyUtils; @@ -204,7 +206,11 @@ protected NodeTuple representJavaBeanProperty(Object javaBean, representer.setPropertyUtils(propertyUtils); representer.addClassTag(PropertiesStructure.class, Tag.MAP); //to avoid adding class tag representer.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK); //use indent instead of {} - return new Yaml(representer).dump(props); + return new Yaml( + new Constructor(new LoaderOptions()), + representer, + new DumperOptions() + ).dump(props); } ///--------------------------------------------------------------------- diff --git a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java index de23d40cd..db82e43be 100644 --- a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java +++ b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.Lists; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.serializers.AvroData; import io.kafbat.ui.exception.JsonAvroConversionException; import java.math.BigDecimal; @@ -224,7 +225,7 @@ public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) { case ENUM -> new TextNode(obj.toString()); case UNION -> { ObjectNode node = MAPPER.createObjectNode(); - int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj); + int unionIdx = AvroSchemaUtils.getGenericData().resolveUnion(avroSchema, obj); Schema selectedType = avroSchema.getTypes().get(unionIdx); node.set( selectUnionTypeFieldName(avroSchema, selectedType, unionIdx), diff --git a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java index 6c6c73113..07ea81c17 100644 --- a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java +++ b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java @@ -25,8 +25,8 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.util.TestSocketUtils; import org.springframework.util.ResourceUtils; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; +import org.testcontainers.kafka.ConfluentKafkaContainer; import org.testcontainers.utility.DockerImageName; @@ -40,7 +40,7 @@ public abstract class AbstractIntegrationTest { private static final String CONFLUENT_PLATFORM_VERSION = "7.8.0"; - public static final KafkaContainer kafka = new KafkaContainer( + public static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)) .withNetwork(Network.SHARED); diff --git a/api/src/test/java/io/kafbat/ui/config/RegexBasedProviderAuthorityExtractorTest.java b/api/src/test/java/io/kafbat/ui/config/RegexBasedProviderAuthorityExtractorTest.java index 11eec0ea4..7e6e4283e 100644 --- a/api/src/test/java/io/kafbat/ui/config/RegexBasedProviderAuthorityExtractorTest.java +++ b/api/src/test/java/io/kafbat/ui/config/RegexBasedProviderAuthorityExtractorTest.java @@ -7,8 +7,6 @@ import static org.mockito.Mockito.when; import static org.springframework.security.oauth2.client.registration.ClientRegistration.withRegistrationId; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; import io.kafbat.ui.config.auth.OAuthProperties; import io.kafbat.ui.model.rbac.Role; diff --git a/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java b/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java index 42d2e2816..ad3de3c6c 100644 --- a/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java +++ b/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java @@ -2,9 +2,9 @@ import java.time.Duration; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.kafka.ConfluentKafkaContainer; public class KafkaConnectContainer extends GenericContainer { private static final int CONNECT_PORT = 8083; @@ -18,8 +18,8 @@ public KafkaConnectContainer(String version) { } - public KafkaConnectContainer withKafka(KafkaContainer kafka) { - String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092"; + public KafkaConnectContainer withKafka(ConfluentKafkaContainer kafka) { + String bootstrapServers = kafka.getNetworkAliases().getFirst() + ":9092"; return withKafka(kafka.getNetwork(), bootstrapServers); } diff --git a/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java b/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java index 09e8aad32..1cb61e107 100644 --- a/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java +++ b/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java @@ -2,9 +2,9 @@ import java.time.Duration; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.kafka.ConfluentKafkaContainer; import org.testcontainers.utility.DockerImageName; public class KsqlDbContainer extends GenericContainer { @@ -20,9 +20,9 @@ public KsqlDbContainer(DockerImageName imageName) { .withStartupTimeout(Duration.ofMinutes(5)); } - public KsqlDbContainer withKafka(KafkaContainer kafka) { + public KsqlDbContainer withKafka(ConfluentKafkaContainer kafka) { dependsOn(kafka); - String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092"; + String bootstrapServers = kafka.getNetworkAliases().getFirst() + ":9092"; return withKafka(kafka.getNetwork(), bootstrapServers); } diff --git a/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java b/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java index 052e5307a..1f67c6f79 100644 --- a/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java +++ b/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java @@ -5,6 +5,7 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; +import org.testcontainers.kafka.ConfluentKafkaContainer; public class SchemaRegistryContainer extends GenericContainer { private static final int SCHEMA_PORT = 8081; @@ -14,7 +15,7 @@ public SchemaRegistryContainer(String version) { withExposedPorts(8081); } - public SchemaRegistryContainer withKafka(KafkaContainer kafka) { + public SchemaRegistryContainer withKafka(ConfluentKafkaContainer kafka) { String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092"; return withKafka(kafka.getNetwork(), bootstrapServers); } diff --git a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java index af9b83418..33414ef05 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java @@ -224,10 +224,9 @@ void testBase64DecodingWorks() { } private TopicMessageDTO msg() { - return TopicMessageDTO.builder() + return new TopicMessageDTO() .partition(1) .offset(-1L) - .timestamp(OffsetDateTime.now()) - .build(); + .timestamp(OffsetDateTime.now()); } } diff --git a/api/src/test/java/io/kafbat/ui/producer/KafkaTestProducer.java b/api/src/test/java/io/kafbat/ui/producer/KafkaTestProducer.java index 03b49bce2..e412d3662 100644 --- a/api/src/test/java/io/kafbat/ui/producer/KafkaTestProducer.java +++ b/api/src/test/java/io/kafbat/ui/producer/KafkaTestProducer.java @@ -7,7 +7,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; -import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.kafka.ConfluentKafkaContainer; public class KafkaTestProducer implements AutoCloseable { private final KafkaProducer producer; @@ -16,7 +16,7 @@ private KafkaTestProducer(KafkaProducer producer) { this.producer = producer; } - public static KafkaTestProducer forKafka(KafkaContainer kafkaContainer) { + public static KafkaTestProducer forKafka(ConfluentKafkaContainer kafkaContainer) { return new KafkaTestProducer<>(new KafkaProducer<>(Map.of( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ProducerConfig.CLIENT_ID_CONFIG, "KafkaTestProducer", diff --git a/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacDisabledTest.java b/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacDisabledTest.java index 199771a78..0a4920b99 100644 --- a/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacDisabledTest.java +++ b/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacDisabledTest.java @@ -152,9 +152,8 @@ void isConnectAccessible() { void isConnectAccessibleDto() { withSecurityContext(() -> { when(user.groups()).thenReturn(List.of(DEV_ROLE)); - ConnectDTO connectDto = ConnectDTO.builder() - .name(CONNECT_NAME) - .build(); + ConnectDTO connectDto = new ConnectDTO() + .name(CONNECT_NAME); Mono consumerGroupAccessibleMono = accessControlService.isConnectAccessible(connectDto, PROD_CLUSTER); StepVerifier.create(consumerGroupAccessibleMono) diff --git a/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java b/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java index b7e307b51..2c24cb5d4 100644 --- a/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java +++ b/api/src/test/java/io/kafbat/ui/service/rbac/AccessControlServiceRbacEnabledTest.java @@ -262,9 +262,8 @@ void isConnectAccessible_notAccessible() { void isConnectAccessible_connectDto() { withSecurityContext(() -> { when(user.groups()).thenReturn(List.of(DEV_ROLE)); - ConnectDTO connectDto = ConnectDTO.builder() - .name(CONNECT_NAME) - .build(); + ConnectDTO connectDto = new ConnectDTO() + .name(CONNECT_NAME); Mono consumerGroupAccessibleMono = accessControlService.isConnectAccessible(connectDto, DEV_CLUSTER); StepVerifier.create(consumerGroupAccessibleMono) @@ -278,9 +277,8 @@ void isConnectAccessible_connectDto() { void isConnectAccessible_connectDto_notAccessible() { withSecurityContext(() -> { when(user.groups()).thenReturn(List.of(DEV_ROLE)); - ConnectDTO connectDto = ConnectDTO.builder() - .name("SOME OTHER CONNECT") - .build(); + ConnectDTO connectDto = new ConnectDTO() + .name("SOME OTHER CONNECT"); Mono consumerGroupAccessibleMono = accessControlService.isConnectAccessible(connectDto, DEV_CLUSTER); StepVerifier.create(consumerGroupAccessibleMono) diff --git a/contract/build.gradle b/contract/build.gradle index 6aba087a9..70f62a04d 100644 --- a/contract/build.gradle +++ b/contract/build.gradle @@ -9,6 +9,10 @@ plugins { def specDir = project.layout.projectDirectory.dir("src/main/resources/swagger/") def targetDir = project.layout.buildDirectory.dir("generated").get() +tasks.withType(JavaCompile).configureEach { + options.compilerArgs << "-Xlint:deprecation" +} + dependencies { implementation libs.spring.starter.webflux implementation libs.spring.starter.validation @@ -54,7 +58,7 @@ tasks.register('generateBackendApi', GenerateTask) { generateConstructorWithAllArgs : "false", generatedConstructorWithRequiredArgs: "false", additionalModelTypeAnnotations : """ - @lombok.experimental.SuperBuilder + @lombok.AllArgsConstructor @lombok.NoArgsConstructor """] diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 3c9aa703c..158330062 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -4201,6 +4201,43 @@ components: type: object additionalProperties: type: string + resourceServer: + type: object + properties: + jwt: + type: object + properties: + jwkSetUri: + type: string + jwsAlgorithms: + type: array + items: + type: string + issuerUri: + type: string + publicKeyLocation: + type: string + audiences: + type: array + items: + type: string + authorityPrefix: + type: string + authoritiesClaimDelimiter: + type: string + authoritiesClaimName: + type: string + principalClaimName: + type: string + opaquetoken: + type: object + properties: + clientId: + type: string + clientSecret: + type: string + introspectionUri: + type: string rbac: type: object properties: @@ -4226,6 +4263,8 @@ components: type: string value: type: string + regex: + type: string permissions: type: array items: @@ -4260,6 +4299,8 @@ components: type: integer defaultPageSize: type: integer + responseTimeoutMs: + type: integer adminClientTimeout: type: integer internalTopicPrefix: @@ -4355,10 +4396,14 @@ components: properties: type: object additionalProperties: true + consumerProperties: + type: object + additionalProperties: true + producerProperties: + type: object + additionalProperties: true readOnly: type: boolean - disableLogDirsCollection: - type: boolean serde: type: array items: diff --git a/e2e-tests/build.gradle b/e2e-tests/build.gradle index da26ba83d..ec9379b71 100644 --- a/e2e-tests/build.gradle +++ b/e2e-tests/build.gradle @@ -1,5 +1,4 @@ import java.nio.file.Files -import java.nio.file.Paths plugins { id 'java' @@ -44,8 +43,7 @@ test { useTestNG() { useDefaultListeners = true - def suitePath = Paths.get(project.rootDir.toString(), - "e2e-tests/src/test/resources/" + suiteName + ".xml" as String) + def suitePath = project.layout.projectDirectory.dir("src/test/resources/" + suiteName + ".xml").asFile.toPath() if (!Files.exists(suitePath)) { throw new GradleException("Suite [" + suitePath.toAbsolutePath() + "] doesn't exist") } diff --git a/frontend/build.gradle b/frontend/build.gradle index 5375853fa..dc982ad73 100644 --- a/frontend/build.gradle +++ b/frontend/build.gradle @@ -9,16 +9,11 @@ node { nodeProjectDir = project.layout.projectDirectory } -tasks.register("clean") { - group = "build" - description = "Cleans frontend build" - - doFirst { - delete( - project.layout.buildDirectory, - project.layout.projectDirectory.dir("src/generated-sources") - ) - } +tasks.register('clean', Delete) { + group = 'build' + description = 'Cleans frontend build' + delete layout.buildDirectory + delete layout.projectDirectory.dir('src/generated-sources') } tasks.named("pnpmInstall") { diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/DeserializeResult.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/DeserializeResult.java index 402690914..8ae6b5202 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/DeserializeResult.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/DeserializeResult.java @@ -9,8 +9,19 @@ */ public final class DeserializeResult { + /** + * Type of deserialized result. + */ public enum Type { - STRING, JSON + /** + * Content is the string. Will be shown as is. + */ + STRING, + /** + * Content is the json object. Will be parsed by Jackson object mapper. + */ + JSON + ; } // nullable @@ -19,6 +30,7 @@ public enum Type { private final Map additionalProperties; /** + * Constructor for {@code DeserializeResult}. * @param result string representation of deserialized binary data * @param type type of string - can it be converted to json or not * @param additionalProperties additional information about deserialized value (will be shown in UI) @@ -30,6 +42,7 @@ public DeserializeResult(String result, Type type, Map additiona } /** + * Getters for result. * @return string representation of deserialized binary data, can be null */ public String getResult() { @@ -37,8 +50,8 @@ public String getResult() { } /** - * @return additional information about deserialized value. * Will be show as json dictionary in UI (serialized with Jackson object mapper). + * @return additional information about deserialized value. * It is recommended to use primitive types and strings for values. */ public Map getAdditionalProperties() { @@ -46,6 +59,7 @@ public Map getAdditionalProperties() { } /** + * Type of deserialized result. Will be used as hint for some internal logic * @return type of deserialized result. Will be used as hint for some internal logic * (ex. if type==STRING smart filters won't try to parse it as json for further usage) */ diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/PropertyResolver.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/PropertyResolver.java index 3060570d4..74a797907 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/PropertyResolver.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/PropertyResolver.java @@ -14,7 +14,7 @@ public interface PropertyResolver { /** * Get property value by name. - * + * @param the type of the property * @param key property name * @param targetType type of property value * @return property value or empty {@code Optional} if property not found @@ -25,6 +25,7 @@ public interface PropertyResolver { /** * Get list-property value by name * + * @param the type of the item * @param key list property name * @param itemType type of list element * @return list property value or empty {@code Optional} if property not found @@ -37,6 +38,8 @@ public interface PropertyResolver { * @param key map-property name * @param keyType type of map key * @param valueType type of map value + * @param the type of the key + * @param the type of the value * @return map-property value or empty {@code Optional} if property not found */ Optional> getMapProperty(String key, Class keyType, Class valueType); diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeader.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeader.java index eec655e2a..855f9dba0 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeader.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeader.java @@ -1,9 +1,20 @@ package io.kafbat.ui.serde.api; +/** + * Header of kafka record. + */ public interface RecordHeader { + /** + * Header key. + * @return header key. + */ String key(); + /** + * Header value. + * @return header value. + */ byte[] value(); } diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeaders.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeaders.java index 4a91c253c..b9be5ee2b 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeaders.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/RecordHeaders.java @@ -1,5 +1,7 @@ package io.kafbat.ui.serde.api; - +/** + * Iterable of {@code RecordHeader}s. + */ public interface RecordHeaders extends Iterable { } diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/SchemaDescription.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/SchemaDescription.java index 60e60a966..9064428f0 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/SchemaDescription.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/SchemaDescription.java @@ -11,7 +11,7 @@ public final class SchemaDescription { private final Map additionalProperties; /** - * + * Constructor for {@code SchemaDescription}. * @param schema schema descriptions. * If contains json-schema (preferred) UI will use it for validation and sample data generation. * @param additionalProperties additional properties about schema (may be rendered in UI in the future) @@ -22,6 +22,7 @@ public SchemaDescription(String schema, Map additionalProperties } /** + * Schema description text. Can be null. * @return schema description text. Preferably contains json-schema. Can be null. */ public String getSchema() { @@ -29,6 +30,7 @@ public String getSchema() { } /** + * Additional properties about schema. * @return additional properties about schema */ public Map getAdditionalProperties() { diff --git a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java index 74705b1b6..24a0f6f87 100644 --- a/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java +++ b/serde-api/src/main/java/io/kafbat/ui/serde/api/Serde.java @@ -27,7 +27,14 @@ public interface Serde extends Closeable { * Kafka record's part that Serde will be applied to. */ enum Target { - KEY, VALUE + /** + * Should be used for key serialization/deserialization + */ + KEY, + /** + * Should be used for value serialization/deserialization. + */ + VALUE, } /** @@ -44,22 +51,32 @@ void configure( ); /** + * Get serde's description. * @return Serde's description. Treated as Markdown text. Will be shown in UI. */ Optional getDescription(); /** + * Get schema description for specified topic's key/value. + * @param topic topic name + * @param type {@code Target} for which {@code SchemaDescription} will be returned. * @return SchemaDescription for specified topic's key/value. * {@code Optional.empty} if there is not information about schema. */ Optional getSchema(String topic, Target type); /** + * Checks if this Serde can be applied to specified topic's key/value deserialization. + * @param topic topic name + * @param type {@code Target} for which {@code Deserializer} will be applied. * @return true if this Serde can be applied to specified topic's key/value deserialization */ boolean canDeserialize(String topic, Target type); /** + * Checks if this Serde can be applied to specified topic's key/value serialization. + * @param topic topic name + * @param type {@code Target} for which {@code Serializer} will be applied. * @return true if this Serde can be applied to specified topic's key/value serialization */ boolean canSerialize(String topic, Target type); @@ -78,12 +95,18 @@ default void close() { * Creates {@code Serializer} for specified topic's key/value. * kafbat-ui doesn't cache {@code Serializes} - new one will be created each time user's message needs to be serialized. * (Unless kafbat-ui supports batch inserts). + * @param topic topic name + * @param type {@code Target} for which {@code Serializer} will be created. + * @return {@code Serializer} for specified topic's key/value. */ Serializer serializer(String topic, Target type); /** * Creates {@code Deserializer} for specified topic's key/value. * {@code Deserializer} will be created for each kafka polling and will be used for all messages within that polling cycle. + * @param topic topic name + * @param type {@code Target} for which {@code Deserializer} will be created. + * @return {@code Deserializer} for specified topic's key/value. */ Deserializer deserializer(String topic, Target type); @@ -93,7 +116,9 @@ default void close() { interface Serializer { /** + * Serializes input string to bytes. * @param input string entered by user into UI text field.
Note: this input is not formatted in any way. + * @return serialized bytes. Can be null if input is null or empty string. */ byte[] serialize(String input); } @@ -102,6 +127,12 @@ interface Serializer { * Deserializes polled record's key/value (depending on what {@code Type} it was created for). */ interface Deserializer { + /** + * Deserializes record's key/value to string. + * @param headers record's headers + * @param data record's key/value + * @return deserialized object. Can be null if input is null or empty string. + */ DeserializeResult deserialize(RecordHeaders headers, byte[] data); } From 2553b48bb6b047fc5ec1aca7df1aa4fe3448e275 Mon Sep 17 00:00:00 2001 From: German Osin Date: Tue, 27 May 2025 14:53:21 +0200 Subject: [PATCH 3/5] fixes --- api/build.gradle | 2 +- .../java/io/kafbat/ui/serdes/builtin/sr/Serialize.java | 1 + .../java/io/kafbat/ui/service/ReactiveAdminClient.java | 1 + .../main/java/io/kafbat/ui/service/StatisticsService.java | 1 + .../io/kafbat/ui/util/jsonschema/JsonAvroConversion.java | 3 ++- .../test/java/io/kafbat/ui/AbstractIntegrationTest.java | 7 +++++-- .../java/io/kafbat/ui/container/KafkaConnectContainer.java | 2 +- .../test/java/io/kafbat/ui/container/KsqlDbContainer.java | 2 +- .../io/kafbat/ui/container/SchemaRegistryContainer.java | 3 +-- .../java/io/kafbat/ui/service/audit/AuditServiceTest.java | 2 ++ .../java/io/kafbat/ui/service/audit/AuditWriterTest.java | 1 + contract/build.gradle | 2 +- 12 files changed, 18 insertions(+), 9 deletions(-) diff --git a/api/build.gradle b/api/build.gradle index 70af06ed5..7afd3eff4 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -107,7 +107,7 @@ generateGrammarSource { } tasks.withType(JavaCompile) { - options.compilerArgs << "-Xlint:deprecation" + options.compilerArgs << "-Xlint:deprecation" << "-Xlint:unchecked" } diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/Serialize.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/Serialize.java index 939230fac..9e1a07594 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/Serialize.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/Serialize.java @@ -96,6 +96,7 @@ static byte[] serializeProto(SchemaRegistryClient srClient, @KafkaClientInternalsDependant("AbstractKafkaAvroSerializer::serializeImpl") @SneakyThrows + @SuppressWarnings("unchecked") static byte[] serializeAvro(AvroSchema schema, int schemaId, String input) { var avroObject = JsonAvroConversion.convertJsonToAvro(input, schema.rawSchema()); try (var out = new ByteArrayOutputStream()) { diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 8bfa8f079..5bb2775a3 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -377,6 +377,7 @@ public Mono describeTopic(String topic) { * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures * finished with classes exceptions and empty Monos. */ + @SuppressWarnings("unchecked") @SafeVarargs static Mono> toMonoWithExceptionFilter(Map> values, Class... classes) { diff --git a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java index 8adadf1ed..bc4ce6a0b 100644 --- a/api/src/main/java/io/kafbat/ui/service/StatisticsService.java +++ b/api/src/main/java/io/kafbat/ui/service/StatisticsService.java @@ -34,6 +34,7 @@ public Mono updateCache(KafkaCluster c) { return getStatistics(c).doOnSuccess(m -> cache.replace(c, m)); } + @SuppressWarnings("unchecked") private Mono getStatistics(KafkaCluster cluster) { return adminClientService.get(cluster).flatMap(ac -> ac.describeCluster().flatMap(description -> diff --git a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java index db82e43be..76baf0072 100644 --- a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java +++ b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java @@ -195,6 +195,7 @@ private static Object convert(JsonNode node, Schema avroSchema) { // converts output of KafkaAvroDeserializer (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!) into json. // Note: conversion should be compatible with AvroJsonSchemaConverter logic! + @SuppressWarnings("unchecked") public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) { if (obj == null) { return NullNode.getInstance(); @@ -213,7 +214,7 @@ public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) { } case MAP -> { ObjectNode node = MAPPER.createObjectNode(); - ((Map) obj).forEach((k, v) -> node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType()))); + ((Map) obj).forEach((k, v) -> node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType()))); yield node; } case ARRAY -> { diff --git a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java index 07ea81c17..fc13dc74c 100644 --- a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java +++ b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java @@ -40,8 +40,11 @@ public abstract class AbstractIntegrationTest { private static final String CONFLUENT_PLATFORM_VERSION = "7.8.0"; - public static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)) + public static final ConfluentKafkaContainer kafkaOriginal = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)); + + public static final ConfluentKafkaContainer kafka = kafkaOriginal + .withListener("0.0.0.0:9095", () -> kafkaOriginal.getNetworkAliases().getFirst() + ":9095") .withNetwork(Network.SHARED); public static final SchemaRegistryContainer schemaRegistry = diff --git a/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java b/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java index ad3de3c6c..8cbf2c50e 100644 --- a/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java +++ b/api/src/test/java/io/kafbat/ui/container/KafkaConnectContainer.java @@ -19,7 +19,7 @@ public KafkaConnectContainer(String version) { public KafkaConnectContainer withKafka(ConfluentKafkaContainer kafka) { - String bootstrapServers = kafka.getNetworkAliases().getFirst() + ":9092"; + String bootstrapServers = kafka.getNetworkAliases().getFirst() + ":9095"; return withKafka(kafka.getNetwork(), bootstrapServers); } diff --git a/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java b/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java index 1cb61e107..7c55c3afe 100644 --- a/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java +++ b/api/src/test/java/io/kafbat/ui/container/KsqlDbContainer.java @@ -22,7 +22,7 @@ public KsqlDbContainer(DockerImageName imageName) { public KsqlDbContainer withKafka(ConfluentKafkaContainer kafka) { dependsOn(kafka); - String bootstrapServers = kafka.getNetworkAliases().getFirst() + ":9092"; + String bootstrapServers = kafka.getNetworkAliases().getFirst() + ":9095"; return withKafka(kafka.getNetwork(), bootstrapServers); } diff --git a/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java b/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java index 1f67c6f79..d5d9b5274 100644 --- a/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java +++ b/api/src/test/java/io/kafbat/ui/container/SchemaRegistryContainer.java @@ -3,7 +3,6 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.kafka.ConfluentKafkaContainer; @@ -16,7 +15,7 @@ public SchemaRegistryContainer(String version) { } public SchemaRegistryContainer withKafka(ConfluentKafkaContainer kafka) { - String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092"; + String bootstrapServers = kafka.getNetworkAliases().getFirst() + ":9095"; return withKafka(kafka.getNetwork(), bootstrapServers); } diff --git a/api/src/test/java/io/kafbat/ui/service/audit/AuditServiceTest.java b/api/src/test/java/io/kafbat/ui/service/audit/AuditServiceTest.java index 6f2591a44..f12e8709f 100644 --- a/api/src/test/java/io/kafbat/ui/service/audit/AuditServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/audit/AuditServiceTest.java @@ -26,6 +26,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +@SuppressWarnings("unchecked") class AuditServiceTest { @Test @@ -59,6 +60,7 @@ void auditCallsWriterMethodDependingOnSignal() { verify(auditWriter).write(any(), any(), eq(th)); } + @SuppressWarnings("unchecked") @Nested class CreateAuditWriter { diff --git a/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java b/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java index 39f3d37e5..c3720d994 100644 --- a/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/audit/AuditWriterTest.java @@ -23,6 +23,7 @@ import org.mockito.Mockito; import org.slf4j.Logger; +@SuppressWarnings("unchecked") class AuditWriterTest { final KafkaProducer producerMock = Mockito.mock(KafkaProducer.class); diff --git a/contract/build.gradle b/contract/build.gradle index 70f62a04d..eec855a12 100644 --- a/contract/build.gradle +++ b/contract/build.gradle @@ -10,7 +10,7 @@ def specDir = project.layout.projectDirectory.dir("src/main/resources/swagger/") def targetDir = project.layout.buildDirectory.dir("generated").get() tasks.withType(JavaCompile).configureEach { - options.compilerArgs << "-Xlint:deprecation" + options.compilerArgs << "-Xlint:deprecation" << "-Xlint:unchecked" } dependencies { From 8a1cdae5c3a5557cb0a98129ebdc37f9174a970d Mon Sep 17 00:00:00 2001 From: German Osin Date: Tue, 27 May 2025 19:01:30 +0200 Subject: [PATCH 4/5] fixes --- .../GlobalErrorWebExceptionHandler.java | 1 + .../kafbat/ui/mapper/DynamicConfigMapper.java | 12 +++++++-- .../ui/util/DynamicConfigOperations.java | 14 ++++++---- .../service/quota/ClientQuotaServiceTest.java | 27 ++++++++++++------- contract/build.gradle | 4 --- 5 files changed, 38 insertions(+), 20 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java b/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java index 482ced492..946a3561b 100644 --- a/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java +++ b/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java @@ -161,6 +161,7 @@ private String extractFieldErrorMsg(FieldError fieldError) { return coalesce(fieldError.getDefaultMessage(), fieldError.getCode(), "Invalid field value"); } + @SafeVarargs private T coalesce(T... items) { return Stream.of(items).filter(Objects::nonNull).findFirst().orElse(null); } diff --git a/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java b/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java index 850429295..59861d49e 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java @@ -5,10 +5,11 @@ import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerJwtDTO; import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerOpaquetokenDTO; import io.kafbat.ui.model.ApplicationConfigPropertiesDTO; +import io.kafbat.ui.model.ApplicationConfigPropertiesRbacRolesInnerPermissionsInnerDTO; +import io.kafbat.ui.model.rbac.Permission; import io.kafbat.ui.util.DynamicConfigOperations; import java.util.Optional; import org.mapstruct.Mapper; -import org.mapstruct.Mapping; import org.springframework.boot.autoconfigure.security.oauth2.resource.OAuth2ResourceServerProperties; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; @@ -16,7 +17,6 @@ @Mapper(componentModel = "spring") public interface DynamicConfigMapper { - @Mapping(target = "rbac.roles[].permissions[].parsedActions", ignore = true) DynamicConfigOperations.PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto); ApplicationConfigPropertiesDTO toDto(DynamicConfigOperations.PropertiesStructure propertiesStructure); @@ -25,6 +25,14 @@ default String map(Resource resource) { return resource.getFilename(); } + default Permission map(ApplicationConfigPropertiesRbacRolesInnerPermissionsInnerDTO perm) { + Permission permission = new Permission(); + permission.setResource(perm.getResource().getValue()); + permission.setActions(perm.getActions().stream().map(ActionDTO::getValue).toList()); + permission.setValue(perm.getValue()); + return permission; + } + default OAuth2ResourceServerProperties map(ApplicationConfigPropertiesAuthOauth2ResourceServerDTO value) { if (value != null) { OAuth2ResourceServerProperties result = new OAuth2ResourceServerProperties(); diff --git a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java index c5dcdd766..0686de2c4 100644 --- a/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java +++ b/api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java @@ -206,11 +206,15 @@ protected NodeTuple representJavaBeanProperty(Object javaBean, representer.setPropertyUtils(propertyUtils); representer.addClassTag(PropertiesStructure.class, Tag.MAP); //to avoid adding class tag representer.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK); //use indent instead of {} - return new Yaml( - new Constructor(new LoaderOptions()), - representer, - new DumperOptions() - ).dump(props); + + DumperOptions dumperOptions = new DumperOptions(); + dumperOptions.setDefaultFlowStyle(representer.getDefaultFlowStyle()); + dumperOptions.setDefaultScalarStyle(representer.getDefaultScalarStyle()); + dumperOptions + .setAllowReadOnlyProperties(representer.getPropertyUtils().isAllowReadOnlyProperties()); + dumperOptions.setTimeZone(representer.getTimeZone()); + + return new Yaml(representer, dumperOptions).dump(props); } ///--------------------------------------------------------------------- diff --git a/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java b/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java index fddf040e9..22634a0a4 100644 --- a/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/quota/ClientQuotaServiceTest.java @@ -1,12 +1,15 @@ package io.kafbat.ui.service.quota; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import io.kafbat.ui.AbstractIntegrationTest; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.service.ClustersStorage; import java.util.Map; -import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.assertj.core.api.ListAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -48,8 +51,9 @@ void createUpdateDelete(String user, String clientId, String ip) { .assertNext(status -> assertThat(status.value()).isEqualTo(201)) .verifyComplete(); - assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, initialQuotas))) - .isTrue(); + awaitAndVerify((l) -> + l.contains(new ClientQuotaRecord(user, clientId, ip, initialQuotas)) + ); //updating StepVerifier.create( @@ -58,8 +62,9 @@ void createUpdateDelete(String user, String clientId, String ip) { .assertNext(status -> assertThat(status.value()).isEqualTo(200)) .verifyComplete(); - assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) - .isTrue(); + awaitAndVerify((l) -> + l.contains(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0))) + ); //deleting created record StepVerifier.create( @@ -68,12 +73,16 @@ void createUpdateDelete(String user, String clientId, String ip) { .assertNext(status -> assertThat(status.value()).isEqualTo(204)) .verifyComplete(); - assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0)))) - .isFalse(); + awaitAndVerify((l) -> + l.doesNotContain(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0))) + ); } - private boolean quotaRecordExists(ClientQuotaRecord rec) { - return Objects.requireNonNull(quotaService.getAll(cluster).collectList().block()).contains(rec); + private void awaitAndVerify(Consumer> verifier) { + await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> verifier.accept(assertThat(quotaService.getAll(cluster).collectList().block()))); } } diff --git a/contract/build.gradle b/contract/build.gradle index eec855a12..d6d662970 100644 --- a/contract/build.gradle +++ b/contract/build.gradle @@ -9,10 +9,6 @@ plugins { def specDir = project.layout.projectDirectory.dir("src/main/resources/swagger/") def targetDir = project.layout.buildDirectory.dir("generated").get() -tasks.withType(JavaCompile).configureEach { - options.compilerArgs << "-Xlint:deprecation" << "-Xlint:unchecked" -} - dependencies { implementation libs.spring.starter.webflux implementation libs.spring.starter.validation From 58bd3eff2bfd1d407506d9af59906cddf7a8e325 Mon Sep 17 00:00:00 2001 From: German Osin Date: Tue, 27 May 2025 19:12:22 +0200 Subject: [PATCH 5/5] Make github tests less verbose --- .github/workflows/backend_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backend_tests.yml b/.github/workflows/backend_tests.yml index e6147b644..749a3e68c 100644 --- a/.github/workflows/backend_tests.yml +++ b/.github/workflows/backend_tests.yml @@ -34,4 +34,4 @@ jobs: - name: "Tests" run: | - ./gradlew :api:test --info + ./gradlew :api:test