Skip to content

Commit 77f65a4

Browse files
author
Loïc GREFFIER
authored
Fixed changing clean up policy from delete to compact on Confluent Cloud (#189)
1 parent c4a78a5 commit 77f65a4

File tree

7 files changed

+366
-96
lines changed

7 files changed

+366
-96
lines changed

api/src/main/java/com/michelin/ns4kafka/controllers/TopicController.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package com.michelin.ns4kafka.controllers;
22

3-
import com.michelin.ns4kafka.models.*;
3+
import com.michelin.ns4kafka.models.DeleteRecordsResponse;
4+
import com.michelin.ns4kafka.models.Namespace;
5+
import com.michelin.ns4kafka.models.Topic;
46
import com.michelin.ns4kafka.services.TopicService;
57
import io.micronaut.http.HttpResponse;
68
import io.micronaut.http.HttpStatus;
79
import io.micronaut.http.annotation.*;
8-
import io.micronaut.http.annotation.Status;
910
import io.swagger.v3.oas.annotations.tags.Tag;
1011
import org.apache.kafka.common.TopicPartition;
1112

@@ -71,31 +72,18 @@ public HttpResponse<Topic> apply(String namespace, @Valid @Body Topic topic, @Qu
7172
if (existingTopic.isEmpty()) {
7273
// Topic namespace ownership validation
7374
if (!topicService.isNamespaceOwnerOfTopic(namespace, topic.getMetadata().getName())) {
74-
validationErrors.add("Namespace not owner of this topic \"" + topic.getMetadata().getName() + "\".");
75+
validationErrors.add(String.format("Namespace not owner of this topic %s.", topic.getMetadata().getName()));
7576
}
7677

7778
// Topic names with a period ('.') or underscore ('_') could collide
7879
List<String> collidingTopics = topicService.findCollidingTopics(ns, topic);
7980
if (!collidingTopics.isEmpty()) {
8081
validationErrors.addAll(collidingTopics.stream()
81-
.map(collidingTopic -> "Topic " + topic.getMetadata().getName()
82-
+ " collides with existing topics: "
83-
+ collidingTopic + ".")
82+
.map(collidingTopic -> String.format("Topic %s collides with existing topics: %s.", topic.getMetadata().getName(), collidingTopic))
8483
.collect(Collectors.toList()));
8584
}
86-
8785
} else {
88-
// Forbidden changes when updating (partitions, replicationFactor)
89-
if (existingTopic.get().getSpec().getPartitions() != topic.getSpec().getPartitions()) {
90-
validationErrors.add("Invalid value " + topic.getSpec().getPartitions()
91-
+ " for configuration partitions: Value is immutable ("
92-
+ existingTopic.get().getSpec().getPartitions() + ").");
93-
}
94-
if (existingTopic.get().getSpec().getReplicationFactor() != topic.getSpec().getReplicationFactor()) {
95-
validationErrors.add("Invalid value " + topic.getSpec().getReplicationFactor()
96-
+ " for configuration replication.factor: Value is immutable ("
97-
+ existingTopic.get().getSpec().getReplicationFactor() + ").");
98-
}
86+
validationErrors.addAll(topicService.validateTopicUpdate(ns, existingTopic.get(), topic));
9987
}
10088

10189
if (!validationErrors.isEmpty()) {

api/src/main/java/com/michelin/ns4kafka/models/Schema.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,12 @@ public class Schema {
3131
private ObjectMeta metadata;
3232

3333
/**
34-
* The schema specifications
34+
* Schema specifications
3535
*/
3636
@Valid
3737
@NotNull
3838
private SchemaSpec spec;
3939

40-
/**
41-
* Schema specifications
42-
*/
4340
@Builder
4441
@AllArgsConstructor
4542
@NoArgsConstructor

api/src/main/java/com/michelin/ns4kafka/models/Topic.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,32 @@
1717
@AllArgsConstructor
1818
@Data
1919
public class Topic {
20+
/**
21+
* API version
22+
*/
2023
private final String apiVersion = "v1";
24+
25+
/**
26+
* Kind of resource
27+
*/
2128
private final String kind = "Topic";
2229

30+
/**
31+
* Schema metadata
32+
*/
2333
@Valid
2434
@NotNull
2535
private ObjectMeta metadata;
2636

37+
/**
38+
* Topic specifications
39+
*/
2740
@NotNull
2841
private TopicSpec spec;
2942

43+
/**
44+
* Topic status
45+
*/
3046
@EqualsAndHashCode.Exclude
3147
private TopicStatus status;
3248

@@ -35,8 +51,19 @@ public class Topic {
3551
@NoArgsConstructor
3652
@Data
3753
public static class TopicSpec {
54+
/**
55+
* Replication factor
56+
*/
3857
private int replicationFactor;
58+
59+
/**
60+
* Partitions quantity
61+
*/
3962
private int partitions;
63+
64+
/**
65+
* Topic configuration
66+
*/
4067
private Map<String, String> configs;
4168
}
4269

@@ -47,11 +74,27 @@ public static class TopicSpec {
4774
@Setter
4875
@Schema(description = "Server-side", accessMode = Schema.AccessMode.READ_ONLY)
4976
public static class TopicStatus {
77+
/**
78+
* Topic phase
79+
*/
5080
private TopicPhase phase;
81+
82+
/**
83+
* Message
84+
*/
5185
private String message;
86+
87+
/**
88+
* Last updated time
89+
*/
5290
@JsonFormat(shape = JsonFormat.Shape.STRING)
5391
private Date lastUpdateTime;
5492

93+
/**
94+
* Success status
95+
* @param message A success message
96+
* @return A success topic status
97+
*/
5598
public static TopicStatus ofSuccess(String message) {
5699
return TopicStatus.builder()
57100
.phase(TopicPhase.Success)
@@ -60,6 +103,11 @@ public static TopicStatus ofSuccess(String message) {
60103
.build();
61104
}
62105

106+
/**
107+
* Failed status
108+
* @param message A failure message
109+
* @return A failure topic status
110+
*/
63111
public static TopicStatus ofFailed(String message) {
64112
return TopicStatus.builder()
65113
.phase(TopicPhase.Failed)
@@ -68,6 +116,10 @@ public static TopicStatus ofFailed(String message) {
68116
.build();
69117
}
70118

119+
/**
120+
* Pending status
121+
* @return A pending topic status
122+
*/
71123
public static TopicStatus ofPending() {
72124
return Topic.TopicStatus.builder()
73125
.phase(Topic.TopicPhase.Pending)
@@ -81,5 +133,4 @@ public enum TopicPhase {
81133
Success,
82134
Failed
83135
}
84-
85136
}

api/src/main/java/com/michelin/ns4kafka/services/TopicService.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.michelin.ns4kafka.models.*;
44
import com.michelin.ns4kafka.repositories.TopicRepository;
5+
import com.michelin.ns4kafka.services.executors.KafkaAsyncExecutorConfig;
56
import com.michelin.ns4kafka.services.executors.TopicAsyncExecutor;
67
import io.micronaut.context.ApplicationContext;
78
import io.micronaut.inject.qualifiers.Qualifiers;
@@ -15,6 +16,9 @@
1516
import java.util.concurrent.TimeoutException;
1617
import java.util.stream.Collectors;
1718

19+
import static org.apache.kafka.common.config.TopicConfig.*;
20+
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
21+
1822
@Singleton
1923
public class TopicService {
2024
/**
@@ -35,6 +39,12 @@ public class TopicService {
3539
@Inject
3640
ApplicationContext applicationContext;
3741

42+
/**
43+
* The managed cluster config
44+
*/
45+
@Inject
46+
List<KafkaAsyncExecutorConfig> kafkaAsyncExecutorConfig;
47+
3848
/**
3949
* Find all topics by given namespace
4050
* @param namespace The namespace
@@ -133,13 +143,47 @@ public List<String> findCollidingTopics(Namespace namespace, Topic topic) throws
133143
}
134144
}
135145

146+
/**
147+
* Validate existing topic can be updated with new given configs
148+
* @param existingTopic The existing topic
149+
* @param newTopic The new topic
150+
* @return A list of validation errors
151+
*/
152+
public List<String> validateTopicUpdate(Namespace namespace, Topic existingTopic, Topic newTopic) {
153+
List<String> validationErrors = new ArrayList<>();
154+
155+
if (existingTopic.getSpec().getPartitions() != newTopic.getSpec().getPartitions()) {
156+
validationErrors.add(String.format("Invalid value %s for configuration partitions: Value is immutable (%s).",
157+
newTopic.getSpec().getPartitions(), existingTopic.getSpec().getPartitions()));
158+
}
159+
160+
if (existingTopic.getSpec().getReplicationFactor() != newTopic.getSpec().getReplicationFactor()) {
161+
validationErrors.add(String.format("Invalid value %s for configuration replication.factor: Value is immutable (%s).",
162+
newTopic.getSpec().getReplicationFactor(), existingTopic.getSpec().getReplicationFactor()));
163+
}
164+
165+
Optional<KafkaAsyncExecutorConfig> topicCluster = kafkaAsyncExecutorConfig
166+
.stream()
167+
.filter(cluster -> namespace.getMetadata().getCluster().equals(cluster.getName()))
168+
.findFirst();
169+
170+
boolean confluentCloudCluster = topicCluster.isPresent() && topicCluster.get().getProvider().equals(KafkaAsyncExecutorConfig.KafkaProvider.CONFLUENT_CLOUD);
171+
if (confluentCloudCluster && existingTopic.getSpec().getConfigs().get(CLEANUP_POLICY_CONFIG).equals(CLEANUP_POLICY_DELETE) &&
172+
newTopic.getSpec().getConfigs().get(CLEANUP_POLICY_CONFIG).equals(CLEANUP_POLICY_COMPACT)) {
173+
validationErrors.add(String.format("Invalid value %s for configuration cleanup.policy: Altering topic configuration from `delete` to `compact` is not currently supported. Please create a new topic with `compact` policy specified instead.",
174+
newTopic.getSpec().getConfigs().get(CLEANUP_POLICY_CONFIG)));
175+
}
176+
177+
return validationErrors;
178+
}
179+
136180
/**
137181
* Check if topics collide with "_" instead of "."
138182
* @param topicA The first topic
139183
* @param topicB The second topic
140184
* @return true if it does, false otherwise
141185
*/
142-
private boolean hasCollision(String topicA, String topicB){
186+
private boolean hasCollision(String topicA, String topicB) {
143187
return topicA.replace('.', '_').equals(topicB.replace('.', '_'));
144188
}
145189

@@ -243,5 +287,4 @@ public Map<TopicPartition, Long> deleteRecords(Topic topic, Map<TopicPartition,
243287
throw new InterruptedException(e.getMessage());
244288
}
245289
}
246-
247290
}

api/src/main/java/com/michelin/ns4kafka/services/executors/KafkaAsyncExecutorConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import lombok.Setter;
1111
import org.apache.kafka.clients.admin.Admin;
1212

13+
import java.security.Provider;
1314
import java.util.Map;
1415
import java.util.Properties;
1516

@@ -81,6 +82,16 @@ public KafkaAsyncExecutorConfig(@Parameter String name) {
8182
this.name = name;
8283
}
8384

85+
/**
86+
* Constructor
87+
* @param name The cluster name
88+
* @param provider The kafka provider
89+
*/
90+
public KafkaAsyncExecutorConfig(@Parameter String name, @Parameter KafkaProvider provider) {
91+
this.name = name;
92+
this.provider = provider;
93+
}
94+
8495
@Getter
8596
@Setter
8697
@Introspected

api/src/test/java/com/michelin/ns4kafka/controllers/TopicControllerTest.java

Lines changed: 4 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,10 @@ void updateTopic() throws InterruptedException, ExecutionException, TimeoutExcep
347347
}
348348

349349
/**
350-
* Validate topic update when number of partitions change
350+
* Validate topic update when there are validations errors
351351
*/
352352
@Test
353-
void updateTopicChangePartition() {
353+
void updateTopicValidationErrors() {
354354
Namespace ns = Namespace.builder()
355355
.metadata(ObjectMeta.builder()
356356
.name("test")
@@ -390,75 +390,14 @@ void updateTopicChangePartition() {
390390
when(namespaceService.findByName("test"))
391391
.thenReturn(Optional.of(ns));
392392
when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.of(existing));
393+
when(topicService.validateTopicUpdate(ns, existing, topic)).thenReturn(List.of("Invalid value 6 for configuration partitions: Value is immutable (3)."));
393394

394395
ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class,
395396
() -> topicController.apply("test", topic, false));
396397
Assertions.assertEquals(1, actual.getValidationErrors().size());
397398
Assertions.assertLinesMatch(List.of("Invalid value 6 for configuration partitions: Value is immutable (3)."), actual.getValidationErrors());
398399
}
399400

400-
/**
401-
* Validate topic update when number of partitions change
402-
*/
403-
@Test
404-
void updateTopicChangeReplicationFactor() {
405-
TopicValidator topicValidator = TopicValidator.builder()
406-
.validationConstraints(
407-
Map.of( "replication.factor", ResourceValidator.Range.between(3,6),
408-
"partitions", ResourceValidator.Range.between(3,6),
409-
"cleanup.policy", ResourceValidator.ValidList.in("delete","compact"),
410-
"min.insync.replicas", ResourceValidator.Range.between(2,2),
411-
"retention.ms", ResourceValidator.Range.between(60000,604800000),
412-
"retention.bytes", ResourceValidator.Range.optionalBetween(-1, 104857600),
413-
"preallocate", ResourceValidator.ValidString.optionalIn("true", "false"))
414-
).build();
415-
416-
Namespace ns = Namespace.builder()
417-
.metadata(ObjectMeta.builder()
418-
.name("test")
419-
.cluster("local")
420-
.build())
421-
.spec(NamespaceSpec.builder()
422-
.topicValidator(topicValidator)
423-
.build())
424-
.build();
425-
426-
Topic existing = Topic.builder()
427-
.metadata(ObjectMeta.builder()
428-
.name("test.topic")
429-
.build())
430-
.spec(Topic.TopicSpec.builder()
431-
.replicationFactor(3)
432-
.partitions(3)
433-
.configs(Map.of("cleanup.policy","compact",
434-
"min.insync.replicas", "2",
435-
"retention.ms", "60000"))
436-
.build())
437-
.build();
438-
439-
Topic topic = Topic.builder()
440-
.metadata(ObjectMeta.builder()
441-
.name("test.topic")
442-
.build())
443-
.spec(Topic.TopicSpec.builder()
444-
.replicationFactor(6)
445-
.partitions(3)
446-
.configs(Map.of("cleanup.policy","delete",
447-
"min.insync.replicas", "2",
448-
"retention.ms", "60000"))
449-
.build())
450-
.build();
451-
452-
when(namespaceService.findByName("test"))
453-
.thenReturn(Optional.of(ns));
454-
when(topicService.findByName(ns, "test.topic")).thenReturn(Optional.of(existing));
455-
456-
ResourceValidationException actual = Assertions.assertThrows(ResourceValidationException.class,
457-
() -> topicController.apply("test", topic, false));
458-
Assertions.assertEquals(1, actual.getValidationErrors().size());
459-
Assertions.assertLinesMatch(List.of("Invalid value 6 for configuration replication.factor: Value is immutable (3)."), actual.getValidationErrors());
460-
}
461-
462401
/**
463402
* Validate topic update when topic doesn't change
464403
* @throws InterruptedException Any interrupted exception
@@ -561,7 +500,7 @@ void createNewTopicDryRun() throws InterruptedException, ExecutionException, Tim
561500
* Validate topic creation when topic validation fails
562501
*/
563502
@Test
564-
void CreateNewTopicFailValidation() {
503+
void createNewTopicFailValidation() {
565504
Namespace ns = Namespace.builder()
566505
.metadata(ObjectMeta.builder()
567506
.name("test")

0 commit comments

Comments
 (0)