Skip to content

Housekeeping. Removed Superbuilders. Fixed deprecations #1103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/backend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ jobs:

- name: "Tests"
run: |
./gradlew :api:test --info
./gradlew :api:test
9 changes: 6 additions & 3 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,14 @@ generateGrammarSource {
arguments += ["-package", "ksql"]
}

tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:deprecation" << "-Xlint:unchecked"
}



sourceSets {
main {
antlr {
srcDirs = ["src/main/antlr4"]
}
java {
srcDirs += generateGrammarSource.outputDirectory
}
Expand Down
1 change: 0 additions & 1 deletion api/src/main/java/io/kafbat/ui/config/McpConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.kafbat.ui.service.mcp.McpTool;
import io.modelcontextprotocol.server.McpAsyncServer;
import io.modelcontextprotocol.server.McpServer;
import io.modelcontextprotocol.server.McpServerFeatures.AsyncPromptSpecification;
import io.modelcontextprotocol.server.McpServerFeatures.AsyncToolSpecification;
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
import io.modelcontextprotocol.spec.McpSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

import io.kafbat.ui.api.ApplicationConfigApi;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.ActionDTO;
import io.kafbat.ui.mapper.DynamicConfigMapper;
import io.kafbat.ui.model.AppAuthenticationSettingsDTO;
import io.kafbat.ui.model.ApplicationConfigDTO;
import io.kafbat.ui.model.ApplicationConfigPropertiesDTO;
import io.kafbat.ui.model.ApplicationConfigValidationDTO;
import io.kafbat.ui.model.ApplicationInfoDTO;
import io.kafbat.ui.model.ClusterConfigValidationDTO;
Expand All @@ -20,12 +19,9 @@
import io.kafbat.ui.util.ApplicationRestarter;
import io.kafbat.ui.util.DynamicConfigOperations;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.Part;
Expand All @@ -41,26 +37,11 @@
@RequiredArgsConstructor
public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {

private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);

@Mapper
interface PropertiesMapper {

DynamicConfigOperations.PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);

ApplicationConfigPropertiesDTO toDto(DynamicConfigOperations.PropertiesStructure propertiesStructure);

default ActionDTO stringToActionDto(String str) {
return Optional.ofNullable(str)
.map(s -> Enum.valueOf(ActionDTO.class, s.toUpperCase()))
.orElseThrow();
}
}

private final DynamicConfigOperations dynamicConfigOperations;
private final ApplicationRestarter restarter;
private final KafkaClusterFactory kafkaClusterFactory;
private final ApplicationInfoService applicationInfoService;
private final DynamicConfigMapper configMapper;

@Override
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
Expand All @@ -83,7 +64,7 @@ public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExch
return validateAccess(context)
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
new ApplicationConfigDTO()
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
.properties(configMapper.toDto(dynamicConfigOperations.getCurrentProperties()))
)))
.doOnEach(sig -> audit(context, sig));
}
Expand All @@ -98,7 +79,7 @@ public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> rest
return validateAccess(context)
.then(restartRequestDto)
.doOnNext(restartDto -> {
var newConfig = MAPPER.fromDto(restartDto.getConfig().getProperties());
var newConfig = configMapper.fromDto(restartDto.getConfig().getProperties());
dynamicConfigOperations.persist(newConfig);
})
.doOnEach(sig -> audit(context, sig))
Expand Down Expand Up @@ -132,7 +113,7 @@ public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<
return validateAccess(context)
.then(configDto)
.flatMap(config -> {
DynamicConfigOperations.PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties());
DynamicConfigOperations.PropertiesStructure newConfig = configMapper.fromDto(config.getProperties());
ClustersProperties clustersProperties = newConfig.getKafka();
return validateClustersConfig(clustersProperties)
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,22 @@ public Mono<ResponseEntity<AuthenticationInfoDTO>> getUserAuthInfo(ServerWebExch
.map(SecurityContext::getAuthentication)
.map(Principal::getName);

var builder = AuthenticationInfoDTO.builder()
var builder = new AuthenticationInfoDTO()
.rbacEnabled(accessControlService.isRbacEnabled());

return userName
.zipWith(permissions)
.map(data -> (AuthenticationInfoDTO) builder
.userInfo(new UserInfoDTO(data.getT1(), data.getT2()))
.build()
)
.switchIfEmpty(Mono.just(builder.build()))
.switchIfEmpty(Mono.just(builder))
.map(ResponseEntity::ok);
}

private List<UserPermissionDTO> mapPermissions(List<Permission> permissions, List<String> clusters) {
return permissions
.stream()
.map(permission -> (UserPermissionDTO) UserPermissionDTO.builder()
.map(permission -> new UserPermissionDTO()
.clusters(clusters)
.resource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()))
.value(permission.getValue())
Expand All @@ -71,7 +70,6 @@ private List<UserPermissionDTO> mapPermissions(List<Permission> permissions, Lis
.map(this::mapAction)
.filter(Objects::nonNull)
.toList())
.build()
)
.toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private String extractFieldErrorMsg(FieldError fieldError) {
return coalesce(fieldError.getDefaultMessage(), fieldError.getCode(), "Invalid field value");
}

@SafeVarargs
private <T> T coalesce(T... items) {
return Stream.of(items).filter(Objects::nonNull).findFirst().orElse(null);
}
Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
@Mapper(componentModel = "spring")
public interface ClusterMapper {

@Mapping(target = "defaultCluster", ignore = true)
ClusterDTO toCluster(InternalClusterState clusterState);

@Mapping(target = "zooKeeperStatus", ignore = true)
ClusterStatsDTO toClusterStats(InternalClusterState clusterState);

default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
Expand Down Expand Up @@ -95,6 +97,8 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {

BrokerDTO toBrokerDto(InternalBroker broker);

@Mapping(target = "keySerde", ignore = true)
@Mapping(target = "valueSerde", ignore = true)
TopicDetailsDTO toTopicDetails(InternalTopic topic);

@Mapping(target = "isReadOnly", source = "readOnly")
Expand Down
21 changes: 11 additions & 10 deletions api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
Expand All @@ -16,7 +18,7 @@
public class DescribeLogDirsMapper {

public List<BrokersLogdirsDTO> toBrokerLogDirsList(
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> logDirsInfo) {
Map<Integer, Map<String, LogDirDescription>> logDirsInfo) {

return logDirsInfo.entrySet().stream().map(
mapEntry -> mapEntry.getValue().entrySet().stream()
Expand All @@ -26,13 +28,13 @@ public List<BrokersLogdirsDTO> toBrokerLogDirsList(
}

private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,
DescribeLogDirsResponse.LogDirInfo logDirInfo) {
LogDirDescription logDirInfo) {
BrokersLogdirsDTO result = new BrokersLogdirsDTO();
result.setName(dirName);
if (logDirInfo.error != null && logDirInfo.error != Errors.NONE) {
result.setError(logDirInfo.error.message());
if (logDirInfo.error() != null) {
result.setError(logDirInfo.error().getMessage());
}
var topics = logDirInfo.replicaInfos.entrySet().stream()
var topics = logDirInfo.replicaInfos().entrySet().stream()
.collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream()
.map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue()))
.toList();
Expand All @@ -41,8 +43,7 @@ private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,
}

private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
List<Map.Entry<TopicPartition,
DescribeLogDirsResponse.ReplicaInfo>> partitions) {
List<Map.Entry<TopicPartition, ReplicaInfo>> partitions) {
BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO();
topic.setName(name);
topic.setPartitions(
Expand All @@ -54,12 +55,12 @@ private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
}

private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition,
DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
ReplicaInfo replicaInfo) {
BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO();
logDir.setBroker(broker);
logDir.setPartition(partition);
logDir.setSize(replicaInfo.size);
logDir.setOffsetLag(replicaInfo.offsetLag);
logDir.setSize(replicaInfo.size());
logDir.setOffsetLag(replicaInfo.offsetLag());
return logDir;
}
}
75 changes: 75 additions & 0 deletions api/src/main/java/io/kafbat/ui/mapper/DynamicConfigMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.kafbat.ui.mapper;

import io.kafbat.ui.model.ActionDTO;
import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerDTO;
import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerJwtDTO;
import io.kafbat.ui.model.ApplicationConfigPropertiesAuthOauth2ResourceServerOpaquetokenDTO;
import io.kafbat.ui.model.ApplicationConfigPropertiesDTO;
import io.kafbat.ui.model.ApplicationConfigPropertiesRbacRolesInnerPermissionsInnerDTO;
import io.kafbat.ui.model.rbac.Permission;
import io.kafbat.ui.util.DynamicConfigOperations;
import java.util.Optional;
import org.mapstruct.Mapper;
import org.springframework.boot.autoconfigure.security.oauth2.resource.OAuth2ResourceServerProperties;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

@Mapper(componentModel = "spring")
public interface DynamicConfigMapper {

DynamicConfigOperations.PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);

ApplicationConfigPropertiesDTO toDto(DynamicConfigOperations.PropertiesStructure propertiesStructure);

default String map(Resource resource) {
return resource.getFilename();
}

default Permission map(ApplicationConfigPropertiesRbacRolesInnerPermissionsInnerDTO perm) {
Permission permission = new Permission();
permission.setResource(perm.getResource().getValue());
permission.setActions(perm.getActions().stream().map(ActionDTO::getValue).toList());
permission.setValue(perm.getValue());
return permission;
}

default OAuth2ResourceServerProperties map(ApplicationConfigPropertiesAuthOauth2ResourceServerDTO value) {
if (value != null) {
OAuth2ResourceServerProperties result = new OAuth2ResourceServerProperties();
if (value.getJwt() != null) {
OAuth2ResourceServerProperties.Jwt jwt = result.getJwt();

ApplicationConfigPropertiesAuthOauth2ResourceServerJwtDTO source = value.getJwt();
Optional.ofNullable(source.getJwsAlgorithms()).ifPresent(jwt::setJwsAlgorithms);
Optional.ofNullable(source.getJwkSetUri()).ifPresent(jwt::setJwkSetUri);
Optional.ofNullable(source.getIssuerUri()).ifPresent(jwt::setIssuerUri);
Optional.ofNullable(source.getPublicKeyLocation())
.map(this::mapResource)
.ifPresent(jwt::setPublicKeyLocation);
Optional.ofNullable(source.getAudiences()).ifPresent(jwt::setAudiences);
Optional.ofNullable(source.getAuthoritiesClaimName()).ifPresent(jwt::setAuthoritiesClaimName);
Optional.ofNullable(source.getAuthoritiesClaimDelimiter()).ifPresent(jwt::setAuthoritiesClaimDelimiter);
Optional.ofNullable(source.getAuthorityPrefix()).ifPresent(jwt::setAuthorityPrefix);
Optional.ofNullable(source.getPrincipalClaimName()).ifPresent(jwt::setPrincipalClaimName);
}
if (value.getOpaquetoken() != null) {
OAuth2ResourceServerProperties.Opaquetoken opaquetoken = result.getOpaquetoken();
ApplicationConfigPropertiesAuthOauth2ResourceServerOpaquetokenDTO source = value.getOpaquetoken();
Optional.ofNullable(source.getClientId()).ifPresent(opaquetoken::setClientId);
Optional.ofNullable(source.getClientSecret()).ifPresent(opaquetoken::setClientSecret);
Optional.ofNullable(source.getIntrospectionUri()).ifPresent(opaquetoken::setIntrospectionUri);
}
}
return null;
}

default Resource mapResource(String filename) {
return new FileSystemResource(filename);
}

default ActionDTO stringToActionDto(String str) {
return Optional.ofNullable(str)
.map(s -> Enum.valueOf(ActionDTO.class, s.toUpperCase()))
.orElseThrow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
import io.kafbat.ui.model.connect.InternalConnectInfo;
import java.util.List;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

@Mapper(componentModel = "spring")
public interface KafkaConnectMapper {
NewConnector toClient(io.kafbat.ui.model.NewConnectorDTO newConnector);

@Mapping(target = "status", ignore = true)
@Mapping(target = "connect", ignore = true)
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);

ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus);

@Mapping(target = "status", ignore = true)
TaskDTO fromClient(ConnectorTask connectorTask);

TaskStatusDTO fromClient(io.kafbat.ui.connect.model.TaskStatus taskStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.LongSummaryStatistics;
import java.util.Map;
import lombok.Value;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -36,12 +37,12 @@ public static InternalLogDirStats empty() {
return new InternalLogDirStats(Map.of());
}

public InternalLogDirStats(Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> log) {
public InternalLogDirStats(Map<Integer, Map<String, LogDirDescription>> log) {
final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
log.entrySet().stream().flatMap(b ->
b.getValue().entrySet().stream().flatMap(topicMap ->
topicMap.getValue().replicaInfos.entrySet().stream()
.map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
topicMap.getValue().replicaInfos().entrySet().stream()
.map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size()))
)
).toList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ private ClassLoader createClassloader(Path location) {
// we assume that location's content does not change during serdes creation
// so, we can reuse already created classloaders
return classloaders.computeIfAbsent(location, l ->
AccessController.doPrivileged(
(PrivilegedAction<URLClassLoader>) () ->
new ChildFirstClassloader(
archives.toArray(URL[]::new),
CustomSerdeLoader.class.getClassLoader())));
new ChildFirstClassloader(
archives.toArray(URL[]::new),
CustomSerdeLoader.class.getClassLoader())
);
}

//---------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ static byte[] serializeProto(SchemaRegistryClient srClient,

@KafkaClientInternalsDependant("AbstractKafkaAvroSerializer::serializeImpl")
@SneakyThrows
@SuppressWarnings("unchecked")
static byte[] serializeAvro(AvroSchema schema, int schemaId, String input) {
var avroObject = JsonAvroConversion.convertJsonToAvro(input, schema.rawSchema());
try (var out = new ByteArrayOutputStream()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
Expand Down
Loading
Loading