Skip to content

Commit d4e3423

Browse files
committed
refactor: use record classes
1 parent 8e63f1a commit d4e3423

37 files changed

+115
-219
lines changed

api/src/main/java/io/kafbat/ui/model/InternalClusterState.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package io.kafbat.ui.model;
22

33
import com.google.common.base.Throwables;
4-
import io.kafbat.ui.model.BrokerDiskUsageDTO;
5-
import io.kafbat.ui.model.MetricsCollectionErrorDTO;
6-
import io.kafbat.ui.model.ServerStatusDTO;
74
import java.math.BigDecimal;
85
import java.util.List;
96
import java.util.Optional;
@@ -40,8 +37,8 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
4037
.stackTrace(Throwables.getStackTraceAsString(e)))
4138
.orElse(null);
4239
topicCount = statistics.getTopicDescriptions().size();
43-
brokerCount = statistics.getClusterDescription().getNodes().size();
44-
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
40+
brokerCount = statistics.getClusterDescription().nodes().size();
41+
activeControllers = Optional.ofNullable(statistics.getClusterDescription().controller())
4542
.map(Node::id)
4643
.orElse(null);
4744
version = statistics.getVersion();

api/src/main/java/io/kafbat/ui/model/InternalPartitionsOffsets.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,11 @@
44
import com.google.common.collect.Table;
55
import java.util.Map;
66
import java.util.Optional;
7-
import lombok.Value;
87
import org.apache.kafka.common.TopicPartition;
98

10-
119
public class InternalPartitionsOffsets {
1210

13-
@Value
14-
public static class Offsets {
15-
Long earliest;
16-
Long latest;
11+
public record Offsets(Long earliest, Long latest) {
1712
}
1813

1914
private final Table<String, Integer, Offsets> offsets = HashBasedTable.create();
Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
package io.kafbat.ui.model;
22

33
import lombok.Builder;
4-
import lombok.Data;
5-
import lombok.RequiredArgsConstructor;
64

7-
@Data
85
@Builder
9-
@RequiredArgsConstructor
10-
public class InternalReplica {
11-
private final int broker;
12-
private final boolean leader;
13-
private final boolean inSync;
6+
public record InternalReplica(int broker, boolean leader, boolean inSync) {
147
}

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public static InternalTopic from(TopicDescription topicDescription,
7777

7878
partitionsOffsets.get(topicDescription.name(), partition.partition())
7979
.ifPresent(offsets -> {
80-
partitionDto.offsetMin(offsets.getEarliest());
81-
partitionDto.offsetMax(offsets.getLatest());
80+
partitionDto.offsetMin(offsets.earliest());
81+
partitionDto.offsetMax(offsets.latest());
8282
});
8383

8484
var segmentStats =

api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ private void fillKey(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec)
7979
}
8080
try {
8181
var deserResult = keyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
82-
message.setKey(deserResult.getResult());
82+
message.setKey(deserResult.result());
8383
message.setKeySerde(keySerdeName);
84-
message.setKeyDeserializeProperties(deserResult.getAdditionalProperties());
84+
message.setKeyDeserializeProperties(deserResult.additionalProperties());
8585
} catch (Exception e) {
8686
log.trace("Error deserializing key for key topic: {}, partition {}, offset {}, with serde {}",
8787
rec.topic(), rec.partition(), rec.offset(), keySerdeName, e);
8888
var deserResult = fallbackKeyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
89-
message.setKey(deserResult.getResult());
89+
message.setKey(deserResult.result());
9090
message.setKeySerde(fallbackSerdeName);
9191
}
9292
}
@@ -98,15 +98,15 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec
9898
try {
9999
var deserResult = valueDeserializer.deserialize(
100100
new RecordHeadersImpl(rec.headers()), rec.value().get());
101-
message.setContent(deserResult.getResult());
101+
message.setContent(deserResult.result());
102102
message.setValueSerde(valueSerdeName);
103-
message.setValueDeserializeProperties(deserResult.getAdditionalProperties());
103+
message.setValueDeserializeProperties(deserResult.additionalProperties());
104104
} catch (Exception e) {
105105
log.trace("Error deserializing key for value topic: {}, partition {}, offset {}, with serde {}",
106106
rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e);
107107
var deserResult = fallbackValueDeserializer.deserialize(
108108
new RecordHeadersImpl(rec.headers()), rec.value().get());
109-
message.setContent(deserResult.getResult());
109+
message.setContent(deserResult.result());
110110
message.setValueSerde(fallbackSerdeName);
111111
}
112112
}

api/src/main/java/io/kafbat/ui/serdes/CustomSerdeLoader.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,10 @@
1818
import java.util.concurrent.ConcurrentHashMap;
1919
import java.util.stream.Collectors;
2020
import lombok.SneakyThrows;
21-
import lombok.Value;
22-
2321

2422
class CustomSerdeLoader {
2523

26-
@Value
27-
static class CustomSerde {
28-
Serde serde;
29-
ClassLoader classLoader;
24+
record CustomSerde(Serde serde, ClassLoader classLoader) {
3025
}
3126

3227
// serde location -> classloader

api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,10 @@ private SerdeInstance loadAndInitCustomSerde(ClustersProperties.SerdeConfig serd
266266
serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
267267
return new SerdeInstance(
268268
serdeConfig.getName(),
269-
loaded.getSerde(),
269+
loaded.serde(),
270270
nullablePattern(serdeConfig.getTopicKeysPattern()),
271271
nullablePattern(serdeConfig.getTopicValuesPattern()),
272-
loaded.getClassLoader()
272+
loaded.classLoader()
273273
);
274274
}
275275

api/src/main/java/io/kafbat/ui/service/BrokerService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private Mono<List<ConfigEntry>> loadBrokersConfig(
5252
}
5353

5454
private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
55-
if (statisticsCache.get(cluster).getClusterDescription().getNodes()
55+
if (statisticsCache.get(cluster).getClusterDescription().nodes()
5656
.stream().noneMatch(node -> node.id() == brokerId)) {
5757
return Flux.error(
5858
new NotFoundException(String.format("Broker with id %s not found", brokerId)));
@@ -70,7 +70,7 @@ public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
7070
return adminClientService
7171
.get(cluster)
7272
.flatMap(ReactiveAdminClient::describeCluster)
73-
.map(description -> description.getNodes().stream()
73+
.map(description -> description.nodes().stream()
7474
.map(node -> new InternalBroker(node, partitionsDistribution, stats))
7575
.collect(Collectors.toList()))
7676
.flatMapMany(Flux::fromIterable);
@@ -113,7 +113,7 @@ private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getC
113113
KafkaCluster cluster, List<Integer> reqBrokers) {
114114
return adminClientService.get(cluster)
115115
.flatMap(admin -> {
116-
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
116+
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().nodes()
117117
.stream()
118118
.map(Node::id)
119119
.collect(Collectors.toList());

api/src/main/java/io/kafbat/ui/service/DeserializationService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ private SerdeDescriptionDTO toDto(SerdeInstance serdeInstance,
143143
return new SerdeDescriptionDTO()
144144
.name(serdeInstance.getName())
145145
.description(serdeInstance.description().orElse(null))
146-
.schema(schemaOpt.map(SchemaDescription::getSchema).orElse(null))
147-
.additionalProperties(schemaOpt.map(SchemaDescription::getAdditionalProperties).orElse(null))
146+
.schema(schemaOpt.map(SchemaDescription::schema).orElse(null))
147+
.additionalProperties(schemaOpt.map(SchemaDescription::additionalProperties).orElse(null))
148148
.preferred(preferred);
149149
}
150150

api/src/main/java/io/kafbat/ui/service/FeatureService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
6161
}
6262

6363
private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
64-
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
64+
var authorizedOps = Optional.ofNullable(clusterDescription.authorizedOperations()).orElse(Set.of());
6565
boolean canEdit = aclViewEnabled(adminClient)
6666
&& (authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER));
6767
return canEdit

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,9 @@ static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersio
124124
}
125125
}
126126

127-
@Value
128-
public static class ClusterDescription {
129-
@Nullable
130-
Node controller;
131-
String clusterId;
132-
Collection<Node> nodes;
133-
@Nullable // null, if ACL is disabled
134-
Set<AclOperation> authorizedOperations;
127+
public record ClusterDescription(@Nullable Node controller, String clusterId, Collection<Node> nodes,
128+
// null, if ACL is disabled
129+
@Nullable Set<AclOperation> authorizedOperations) {
135130
}
136131

137132
@Builder
@@ -147,7 +142,7 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
147142
// choosing node from which we will get configs (starting with controller)
148143
var targetNodeId = Optional.ofNullable(desc.controller)
149144
.map(Node::id)
150-
.orElse(desc.getNodes().iterator().next().id());
145+
.orElse(desc.nodes().iterator().next().id());
151146
return loadBrokersConfig(ac, List.of(targetNodeId))
152147
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
153148
.flatMap(configs -> {
@@ -391,7 +386,7 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v
391386

392387
public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
393388
return describeCluster()
394-
.map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
389+
.map(d -> d.nodes().stream().map(Node::id).collect(toList()))
395390
.flatMap(this::describeLogDirs);
396391
}
397392

api/src/main/java/io/kafbat/ui/service/StatisticsService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ public Mono<Statistics> updateCache(KafkaCluster c) {
3737
private Mono<Statistics> getStatistics(KafkaCluster cluster) {
3838
return adminClientService.get(cluster).flatMap(ac ->
3939
ac.describeCluster().flatMap(description ->
40-
ac.updateInternalStats(description.getController()).then(
40+
ac.updateInternalStats(description.controller()).then(
4141
Mono.zip(
4242
List.of(
43-
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
43+
metricsCollector.getBrokerMetrics(cluster, description.nodes()),
4444
getLogDirInfo(description, ac),
4545
featureService.getAvailableFeatures(ac, cluster, description),
4646
loadTopicConfigs(cluster),
@@ -64,7 +64,7 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
6464
}
6565

6666
private Mono<InternalLogDirStats> getLogDirInfo(ClusterDescription desc, ReactiveAdminClient ac) {
67-
var brokerIds = desc.getNodes().stream().map(Node::id).collect(Collectors.toSet());
67+
var brokerIds = desc.nodes().stream().map(Node::id).collect(Collectors.toSet());
6868
return ac.describeLogDirs(brokerIds).map(InternalLogDirStats::new);
6969
}
7070

api/src/main/java/io/kafbat/ui/service/TopicsService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public Mono<ReplicationFactorChangeResponseDTO> changeReplicationFactor(
253253
Integer actual = topic.getReplicationFactor();
254254
Integer requested = replicationFactorChange.getTotalReplicationFactor();
255255
Integer brokersCount = statisticsCache.get(cluster).getClusterDescription()
256-
.getNodes().size();
256+
.nodes().size();
257257

258258
if (requested.equals(actual)) {
259259
return Mono.error(
@@ -361,14 +361,14 @@ private Map<Integer, List<Integer>> getCurrentAssignment(InternalTopic topic) {
361361
.collect(toMap(
362362
InternalPartition::getPartition,
363363
p -> p.getReplicas().stream()
364-
.map(InternalReplica::getBroker)
364+
.map(InternalReplica::broker)
365365
.collect(toList())
366366
));
367367
}
368368

369369
private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
370370
Map<Integer, List<Integer>> currentAssignment) {
371-
Map<Integer, Integer> result = statisticsCache.get(cluster).getClusterDescription().getNodes()
371+
Map<Integer, Integer> result = statisticsCache.get(cluster).getClusterDescription().nodes()
372372
.stream()
373373
.map(Node::id)
374374
.collect(toMap(

api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,7 @@ public Optional<JsonNode> getColumnValue(List<JsonNode> row, String column) {
5959
}
6060
}
6161

62-
@Value
63-
private static class KsqlRequest {
64-
String ksql;
65-
Map<String, String> streamsProperties;
62+
private record KsqlRequest(String ksql, Map<String, String> streamsProperties) {
6663
}
6764

6865
//--------------------------------------------------------------------------------------------
@@ -172,7 +169,7 @@ public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamPr
172169
if (parsedStatements.isEmpty()) {
173170
return errorTableFlux("Sql statement is invalid or unsupported");
174171
}
175-
var statements = parsedStatements.get().getStatements();
172+
var statements = parsedStatements.get().statements();
176173
if (statements.size() > 1) {
177174
return errorTableFlux("Only single statement supported now");
178175
}

api/src/main/java/io/kafbat/ui/service/ksql/KsqlGrammar.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import ksql.KsqlGrammarLexer;
77
import ksql.KsqlGrammarParser;
88
import lombok.RequiredArgsConstructor;
9-
import lombok.Value;
109
import lombok.experimental.Delegate;
1110
import org.antlr.v4.runtime.BaseErrorListener;
1211
import org.antlr.v4.runtime.CharStream;
@@ -22,9 +21,7 @@ class KsqlGrammar {
2221
private KsqlGrammar() {
2322
}
2423

25-
@Value
26-
static class KsqlStatements {
27-
List<KsqlGrammarParser.SingleStatementContext> statements;
24+
record KsqlStatements(List<KsqlGrammarParser.SingleStatementContext> statements) {
2825
}
2926

3027
// returns Empty if no valid statements found

api/src/main/java/io/kafbat/ui/service/ksql/KsqlServiceV2.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020
@Service
2121
public class KsqlServiceV2 {
2222

23-
@lombok.Value
24-
private static class KsqlExecuteCommand {
25-
KafkaCluster cluster;
26-
String ksql;
27-
Map<String, String> streamProperties;
23+
private record KsqlExecuteCommand(KafkaCluster cluster, String ksql, Map<String, String> streamProperties) {
2824
}
2925

3026
private final Cache<String, KsqlExecuteCommand> registeredCommands =

api/src/main/java/io/kafbat/ui/service/masking/DataMasking.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,7 @@ public class DataMasking {
2222

2323
private static final JsonMapper JSON_MAPPER = new JsonMapper();
2424

25-
@Value
26-
static class Mask {
27-
@Nullable
28-
Pattern topicKeysPattern;
29-
@Nullable
30-
Pattern topicValuesPattern;
31-
32-
MaskingPolicy policy;
33-
25+
record Mask(@Nullable Pattern topicKeysPattern, @Nullable Pattern topicValuesPattern, MaskingPolicy policy) {
3426
boolean shouldBeApplied(String topic, Serde.Target target) {
3527
return target == Serde.Target.KEY
3628
? topicKeysPattern != null && topicKeysPattern.matcher(topic).matches()

api/src/main/java/io/kafbat/ui/service/metrics/RawMetric.java

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,7 @@ static RawMetric create(String name, Map<String, String> labels, BigDecimal valu
2727
return new SimpleMetric(name, labels, value);
2828
}
2929

30-
@AllArgsConstructor
31-
@EqualsAndHashCode
32-
@ToString
33-
class SimpleMetric implements RawMetric {
34-
35-
private final String name;
36-
private final Map<String, String> labels;
37-
private final BigDecimal value;
38-
39-
@Override
40-
public String name() {
41-
return name;
42-
}
43-
44-
@Override
45-
public Map<String, String> labels() {
46-
return labels;
47-
}
48-
49-
@Override
50-
public BigDecimal value() {
51-
return value;
52-
}
30+
record SimpleMetric(String name, Map<String, String> labels, BigDecimal value) implements RawMetric {
5331

5432
@Override
5533
public RawMetric copyWithValue(BigDecimal newValue) {

api/src/main/java/io/kafbat/ui/util/jsonschema/AvroJsonSchemaConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ public JsonSchema convert(URI basePath, Schema schema) {
2727

2828
if (type.getType().equals(JsonType.Type.OBJECT)) {
2929
final ObjectFieldSchema objectRoot = (ObjectFieldSchema) root;
30-
builder.properties(objectRoot.getProperties());
31-
builder.required(objectRoot.getRequired());
30+
builder.properties(objectRoot.properties());
31+
builder.required(objectRoot.required());
3232
}
3333

3434
return builder.build();

0 commit comments

Comments
 (0)