Skip to content

Commit 0029788

Browse files
author
Loïc GREFFIER
authored
Added an option to disable ACL deletion for a cluster (#177)
* Add an option to disable ACL deletion for a cluster * Improve unit tests coverage * Improve unit tests coverage * Force deletion directly from controllers for stream and acl
1 parent 8b933fa commit 0029788

16 files changed

+761
-173
lines changed

api/src/main/java/com/michelin/ns4kafka/controllers/AccessControlListController.java

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,32 @@
2525
description = "APIs to handle cross namespace ACL")
2626
@Controller("/api/namespaces/{namespace}/acls")
2727
public class AccessControlListController extends NamespacedResourceController {
28+
/**
29+
* The namespace service
30+
*/
2831
@Inject
2932
NamespaceService namespaceService;
33+
34+
/**
35+
* The ACL service
36+
*/
3037
@Inject
3138
AccessControlEntryService accessControlEntryService;
3239

40+
/**
41+
* Get all ACLs of given namespace
42+
* @param namespace The namespace
43+
* @param limit The ACL scope
44+
* @return A list of ACLs
45+
*/
3346
@Operation(summary = "Returns the Access Control Entry List")
3447
@Get("{?limit}")
3548
public List<AccessControlEntry> list(String namespace, Optional<AclLimit> limit) {
36-
if (limit.isEmpty())
49+
if (limit.isEmpty()) {
3750
limit = Optional.of(AclLimit.ALL);
51+
}
3852

3953
Namespace ns = getNamespace(namespace);
40-
4154
switch (limit.get()) {
4255
case GRANTEE:
4356
return accessControlEntryService.findAllGrantedToNamespace(ns)
@@ -69,6 +82,12 @@ public List<AccessControlEntry> list(String namespace, Optional<AclLimit> limit)
6982

7083
}
7184

85+
/**
86+
* Get an ACL by namespace and name
87+
* @param namespace The name
88+
* @param acl The ACL name
89+
* @return The ACL
90+
*/
7291
@Get("/{acl}")
7392
public Optional<AccessControlEntry> get(String namespace, String acl) {
7493
return list(namespace, Optional.of(AclLimit.ALL))
@@ -77,75 +96,94 @@ public Optional<AccessControlEntry> get(String namespace, String acl) {
7796
.findFirst();
7897
}
7998

99+
/**
100+
* Create an ACL
101+
* @param authentication The authentication entity
102+
* @param namespace The namespace
103+
* @param accessControlEntry The ACL
104+
* @param dryrun Is dry run mode or not ?
105+
* @return An HTTP response
106+
*/
80107
@Post("{?dryrun}")
81108
public HttpResponse<AccessControlEntry> apply(Authentication authentication, String namespace, @Valid @Body AccessControlEntry accessControlEntry, @QueryValue(defaultValue = "false") boolean dryrun) {
82109
Namespace ns = getNamespace(namespace);
83110

84111
List<String> roles = (List<String>) authentication.getAttributes().get("roles");
85112
boolean isAdmin = roles.contains(ResourceBasedSecurityRule.IS_ADMIN);
86-
// self assigned ACL (spec.grantedTo == metadata.namespace)
113+
// Self assigned ACL (spec.grantedTo == metadata.namespace)
87114
boolean isSelfAssignedACL = namespace.equals(accessControlEntry.getSpec().getGrantedTo());
88115

89116
List<String> validationErrors;
90117
if (isAdmin && isSelfAssignedACL) {
91-
// validate overlapping OWNER
118+
// Validate overlapping OWNER
92119
validationErrors = accessControlEntryService.validateAsAdmin(accessControlEntry, ns);
93120
} else {
94121
validationErrors = accessControlEntryService.validate(accessControlEntry, ns);
95122
}
123+
96124
if (!validationErrors.isEmpty()) {
97125
throw new ResourceValidationException(validationErrors, accessControlEntry.getKind(), accessControlEntry.getMetadata().getName());
98126
}
127+
99128
// AccessControlEntry spec is immutable
100129
// This prevents accidental updates on ACL resources already declared with the same name (with differents rules)
101130
Optional<AccessControlEntry> existingACL = accessControlEntryService.findByName(namespace, accessControlEntry.getMetadata().getName());
102131
if(existingACL.isPresent() && !existingACL.get().getSpec().equals(accessControlEntry.getSpec())){
103132
throw new ResourceValidationException(List.of("Invalid modification: `spec` is immutable. You can still update `metadata`"), accessControlEntry.getKind(), accessControlEntry.getMetadata().getName());
104133
}
105134

106-
//augment
107135
accessControlEntry.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
108136
accessControlEntry.getMetadata().setCluster(ns.getMetadata().getCluster());
109137
accessControlEntry.getMetadata().setNamespace(ns.getMetadata().getName());
110138

111-
if(existingACL.isPresent() && existingACL.get().equals(accessControlEntry)){
139+
if (existingACL.isPresent() && existingACL.get().equals(accessControlEntry)) {
112140
return formatHttpResponse(existingACL.get(), ApplyStatus.unchanged);
113141
}
142+
114143
ApplyStatus status = existingACL.isPresent() ? ApplyStatus.changed : ApplyStatus.created;
115144

116-
//dryrun checks
145+
// Dry run checks
117146
if (dryrun) {
118147
return formatHttpResponse(accessControlEntry, status);
119148
}
149+
120150
sendEventLog(accessControlEntry.getKind(),
121151
accessControlEntry.getMetadata(),
122152
status,
123-
existingACL.isPresent() ? existingACL.get().getSpec() : null,
153+
existingACL.<Object>map(AccessControlEntry::getSpec).orElse(null),
124154
accessControlEntry.getSpec());
125155

126-
//store
156+
// Store
127157
return formatHttpResponse(accessControlEntryService.create(accessControlEntry), status);
128158
}
129159

160+
/**
161+
* Delete an ACL
162+
* @param authentication The authentication entity
163+
* @param namespace The namespace
164+
* @param name The ACL name
165+
* @param dryrun Is dry run mode or not ?
166+
* @return An HTTP response
167+
*/
130168
@Delete("/{name}{?dryrun}")
131169
@Status(HttpStatus.NO_CONTENT)
132170
public HttpResponse<Void> delete(Authentication authentication, String namespace, String name, @QueryValue(defaultValue = "false") boolean dryrun) {
133-
171+
Namespace ns = getNamespace(namespace);
134172
AccessControlEntry accessControlEntry = accessControlEntryService
135173
.findByName(namespace, name)
136174
.orElseThrow(() -> new ResourceValidationException(
137175
List.of("Invalid value " + name + " for name : AccessControlEntry doesn't exist in this namespace"),
138176
"AccessControlEntry",
139-
name
140-
)
177+
name)
141178
);
142179

143180
List<String> roles = (List<String>) authentication.getAttributes().get("roles");
144181
boolean isAdmin = roles.contains(ResourceBasedSecurityRule.IS_ADMIN);
145-
// self assigned ACL (spec.grantedTo == metadata.namespace)
182+
// Self assigned ACL (spec.grantedTo == metadata.namespace)
146183
boolean isSelfAssignedACL = namespace.equals(accessControlEntry.getSpec().getGrantedTo());
184+
147185
if (isSelfAssignedACL && !isAdmin) {
148-
// prevent delete
186+
// Prevent delete
149187
throw new ResourceValidationException(
150188
List.of("Only admins can delete this AccessControlEntry"),
151189
"AccessControlEntry",
@@ -158,7 +196,7 @@ public HttpResponse<Void> delete(Authentication authentication, String namespace
158196
}
159197

160198
sendEventLog(accessControlEntry.getKind(), accessControlEntry.getMetadata(), ApplyStatus.deleted,accessControlEntry.getSpec(), null);
161-
accessControlEntryService.delete(accessControlEntry);
199+
accessControlEntryService.delete(ns, accessControlEntry);
162200
return HttpResponse.noContent();
163201
}
164202

api/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
@ExecuteOn(TaskExecutors.IO)
2323
public class SchemaController extends NamespacedResourceController {
2424
/**
25-
* Subject service
25+
* The schema service
2626
*/
2727
@Inject
2828
SchemaService schemaService;
2929

3030
/**
3131
* Get all the schemas by namespace
32-
*
3332
* @param namespace The namespace
3433
* @return A list of schemas
3534
*/
@@ -41,7 +40,6 @@ public List<Schema> list(String namespace) {
4140

4241
/**
4342
* Get the last version of a schema by namespace and subject
44-
*
4543
* @param namespace The namespace
4644
* @param subject The subject
4745
* @return A schema
@@ -59,7 +57,6 @@ public Optional<Schema> get(String namespace, String subject) {
5957

6058
/**
6159
* Publish a schema
62-
*
6360
* @param namespace The namespace
6461
* @param schema The schema to create
6562
* @param dryrun Does the creation is a dry run
@@ -115,7 +112,6 @@ public HttpResponse<Schema> apply(String namespace, @Valid @Body Schema schema,
115112

116113
/**
117114
* Delete all schemas under the given subject
118-
*
119115
* @param namespace The current namespace
120116
* @param subject The current subject to delete
121117
* @param dryrun Run in dry mode or not
@@ -158,7 +154,6 @@ public HttpResponse<Void> deleteSubject(String namespace, @PathVariable String s
158154

159155
/**
160156
* Update the compatibility of a subject
161-
*
162157
* @param namespace The namespace
163158
* @param subject The subject to update
164159
* @param compatibility The compatibility to apply

api/src/main/java/com/michelin/ns4kafka/controllers/StreamController.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,59 @@
1818
@Tag(name = "Stream")
1919
@Controller(value = "/api/namespaces/{namespace}/streams")
2020
public class StreamController extends NamespacedResourceController {
21-
21+
/**
22+
* The Kafka Streams service
23+
*/
2224
@Inject
2325
StreamService streamService;
2426

27+
/**
28+
* Get all the Kafka Streams by namespace
29+
* @param namespace The namespace
30+
* @return A list of Kafka Streams
31+
*/
2532
@Get("/")
2633
List<KafkaStream> list(String namespace){
2734
Namespace ns = getNamespace(namespace);
2835
return streamService.findAllForNamespace(ns);
2936

3037
}
3138

39+
/**
40+
* Get a Kafka Streams by namespace and name
41+
* @param namespace The name
42+
* @param stream The Kafka Streams name
43+
* @return The Kafka Streams
44+
*/
3245
@Get("/{stream}")
33-
Optional<KafkaStream> get(String namespace,String stream){
46+
Optional<KafkaStream> get(String namespace, String stream){
3447

3548
Namespace ns = getNamespace(namespace);
3649
return streamService.findByName(ns, stream);
3750

3851
}
3952

53+
/**
54+
* Create a Kafka Streams
55+
* @param namespace The namespace
56+
* @param stream The Kafka Stream
57+
* @param dryrun Is dry run mode or not ?
58+
* @return An HTTP response
59+
*/
4060
@Post("/{?dryrun}")
4161
HttpResponse<KafkaStream> apply(String namespace,@Body @Valid KafkaStream stream, @QueryValue(defaultValue = "false") boolean dryrun){
4262
Namespace ns = getNamespace(namespace);
43-
44-
//Creation of the correct ACLs
4563
if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream.getMetadata().getName())) {
4664
throw new ResourceValidationException(List.of("Invalid value " + stream.getMetadata().getName()
4765
+ " for name: Namespace not OWNER of underlying Topic prefix and Group prefix"), "Stream", stream.getMetadata().getName());
4866
}
49-
//Augment the Stream
67+
68+
// Augment the Stream
5069
stream.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
5170
stream.getMetadata().setCluster(ns.getMetadata().getCluster());
5271
stream.getMetadata().setNamespace(ns.getMetadata().getName());
5372

54-
//Creation of the correct ACLs
73+
// Creation of the correct ACLs
5574
Optional<KafkaStream> existingStream = streamService.findByName(ns, stream.getMetadata().getName());
5675
if (existingStream.isPresent() && existingStream.get().equals(stream)){
5776
return formatHttpResponse(stream, ApplyStatus.unchanged);
@@ -62,16 +81,23 @@ HttpResponse<KafkaStream> apply(String namespace,@Body @Valid KafkaStream stream
6281
if (dryrun) {
6382
return formatHttpResponse(stream, status);
6483
}
84+
6585
sendEventLog(stream.getKind(),
6686
stream.getMetadata(),
6787
status,
6888
null,
6989
null);
7090

7191
return formatHttpResponse(streamService.create(stream), status);
72-
7392
}
7493

94+
/**
95+
* Delete a Kafka Streams
96+
* @param namespace The namespace
97+
* @param stream The Kafka Streams
98+
* @param dryrun Is dry run mode or not ?
99+
* @return An HTTP response
100+
*/
75101
@Status(HttpStatus.NO_CONTENT)
76102
@Delete("/{stream}{?dryrun}")
77103
HttpResponse delete(String namespace,String stream, @QueryValue(defaultValue = "false") boolean dryrun){
@@ -80,22 +106,24 @@ HttpResponse delete(String namespace,String stream, @QueryValue(defaultValue = "
80106
throw new ResourceValidationException(List.of("Invalid value " + stream
81107
+ " for name: Namespace not OWNER of underlying Topic prefix and Group prefix"), "Stream", stream);
82108
}
83-
// exists ?
109+
84110
Optional<KafkaStream> optionalStream = streamService.findByName(ns, stream);
85111

86-
if (optionalStream.isEmpty())
112+
if (optionalStream.isEmpty()) {
87113
return HttpResponse.notFound();
114+
}
88115

89116
if (dryrun) {
90117
return HttpResponse.noContent();
91118
}
119+
92120
var streamToDelete = optionalStream.get();
93121
sendEventLog(streamToDelete.getKind(),
94122
streamToDelete.getMetadata(),
95123
ApplyStatus.deleted,
96124
null,
97125
null);
98-
streamService.delete(optionalStream.get());
126+
streamService.delete(ns, optionalStream.get());
99127
return HttpResponse.noContent();
100128
}
101129
}

api/src/main/java/com/michelin/ns4kafka/controllers/TopicController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public List<Topic> importResources(String namespace, @QueryValue(defaultValue =
178178
if (dryrun) {
179179
return unsynchronizedTopics;
180180
}
181+
181182
List<Topic> synchronizedTopics = unsynchronizedTopics.stream()
182183
.map(topic -> {
183184
sendEventLog("Topic", topic.getMetadata(), ApplyStatus.created, null, topic.getSpec());

0 commit comments

Comments
 (0)