Skip to content

Commit 9ecb9d2

Browse files
Deleteacls support (#1174)
* support deleteacls * add deleteacls_test * add protocol test * test that acl was deleted * trigger build
1 parent f4ca0b4 commit 9ecb9d2

File tree

4 files changed

+465
-0
lines changed

4 files changed

+465
-0
lines changed

deleteacls.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/deleteacls"
10+
)
11+
12+
// DeleteACLsRequest represents a request sent to a kafka broker to delete
13+
// ACLs.
14+
type DeleteACLsRequest struct {
15+
// Address of the kafka broker to send the request to.
16+
Addr net.Addr
17+
18+
// List of ACL filters to use for deletion.
19+
Filters []DeleteACLsFilter
20+
}
21+
22+
type DeleteACLsFilter struct {
23+
ResourceTypeFilter ResourceType
24+
ResourceNameFilter string
25+
ResourcePatternTypeFilter PatternType
26+
PrincipalFilter string
27+
HostFilter string
28+
Operation ACLOperationType
29+
PermissionType ACLPermissionType
30+
}
31+
32+
// DeleteACLsResponse represents a response from a kafka broker to an ACL
33+
// deletion request.
34+
type DeleteACLsResponse struct {
35+
// The amount of time that the broker throttled the request.
36+
Throttle time.Duration
37+
38+
// List of the results from the deletion request.
39+
Results []DeleteACLsResult
40+
}
41+
42+
type DeleteACLsResult struct {
43+
Error error
44+
MatchingACLs []DeleteACLsMatchingACLs
45+
}
46+
47+
type DeleteACLsMatchingACLs struct {
48+
Error error
49+
ResourceType ResourceType
50+
ResourceName string
51+
ResourcePatternType PatternType
52+
Principal string
53+
Host string
54+
Operation ACLOperationType
55+
PermissionType ACLPermissionType
56+
}
57+
58+
// DeleteACLs sends ACLs deletion request to a kafka broker and returns the
59+
// response.
60+
func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) {
61+
filters := make([]deleteacls.RequestFilter, 0, len(req.Filters))
62+
63+
for _, filter := range req.Filters {
64+
filters = append(filters, deleteacls.RequestFilter{
65+
ResourceTypeFilter: int8(filter.ResourceTypeFilter),
66+
ResourceNameFilter: filter.ResourceNameFilter,
67+
ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter),
68+
PrincipalFilter: filter.PrincipalFilter,
69+
HostFilter: filter.HostFilter,
70+
Operation: int8(filter.Operation),
71+
PermissionType: int8(filter.PermissionType),
72+
})
73+
}
74+
75+
m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{
76+
Filters: filters,
77+
})
78+
if err != nil {
79+
return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err)
80+
}
81+
82+
res := m.(*deleteacls.Response)
83+
84+
results := make([]DeleteACLsResult, 0, len(res.FilterResults))
85+
86+
for _, result := range res.FilterResults {
87+
matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs))
88+
89+
for _, matchingACL := range result.MatchingACLs {
90+
matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{
91+
Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage),
92+
ResourceType: ResourceType(matchingACL.ResourceType),
93+
ResourceName: matchingACL.ResourceName,
94+
ResourcePatternType: PatternType(matchingACL.ResourcePatternType),
95+
Principal: matchingACL.Principal,
96+
Host: matchingACL.Host,
97+
Operation: ACLOperationType(matchingACL.Operation),
98+
PermissionType: ACLPermissionType(matchingACL.PermissionType),
99+
})
100+
}
101+
102+
results = append(results, DeleteACLsResult{
103+
Error: makeError(result.ErrorCode, result.ErrorMessage),
104+
MatchingACLs: matchingACLs,
105+
})
106+
}
107+
108+
ret := &DeleteACLsResponse{
109+
Throttle: makeDuration(res.ThrottleTimeMs),
110+
Results: results,
111+
}
112+
113+
return ret, nil
114+
}

deleteacls_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting "github.com/segmentio/kafka-go/testing"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestClientDeleteACLs(t *testing.T) {
12+
if !ktesting.KafkaIsAtLeast("2.0.1") {
13+
return
14+
}
15+
16+
client, shutdown := newLocalClient()
17+
defer shutdown()
18+
19+
topic := makeTopic()
20+
group := makeGroupID()
21+
22+
createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
23+
ACLs: []ACLEntry{
24+
{
25+
Principal: "User:alice",
26+
PermissionType: ACLPermissionTypeAllow,
27+
Operation: ACLOperationTypeRead,
28+
ResourceType: ResourceTypeTopic,
29+
ResourcePatternType: PatternTypeLiteral,
30+
ResourceName: topic,
31+
Host: "*",
32+
},
33+
{
34+
Principal: "User:bob",
35+
PermissionType: ACLPermissionTypeAllow,
36+
Operation: ACLOperationTypeRead,
37+
ResourceType: ResourceTypeGroup,
38+
ResourcePatternType: PatternTypeLiteral,
39+
ResourceName: group,
40+
Host: "*",
41+
},
42+
},
43+
})
44+
if err != nil {
45+
t.Fatal(err)
46+
}
47+
48+
for _, err := range createRes.Errors {
49+
if err != nil {
50+
t.Error(err)
51+
}
52+
}
53+
54+
deleteResp, err := client.DeleteACLs(context.Background(), &DeleteACLsRequest{
55+
Filters: []DeleteACLsFilter{
56+
{
57+
ResourceTypeFilter: ResourceTypeTopic,
58+
ResourceNameFilter: topic,
59+
ResourcePatternTypeFilter: PatternTypeLiteral,
60+
Operation: ACLOperationTypeRead,
61+
PermissionType: ACLPermissionTypeAllow,
62+
},
63+
},
64+
})
65+
if err != nil {
66+
t.Fatal(err)
67+
}
68+
69+
expectedDeleteResp := DeleteACLsResponse{
70+
Throttle: 0,
71+
Results: []DeleteACLsResult{
72+
{
73+
Error: makeError(0, ""),
74+
MatchingACLs: []DeleteACLsMatchingACLs{
75+
{
76+
Error: makeError(0, ""),
77+
ResourceType: ResourceTypeTopic,
78+
ResourceName: topic,
79+
ResourcePatternType: PatternTypeLiteral,
80+
Principal: "User:alice",
81+
Host: "*",
82+
Operation: ACLOperationTypeRead,
83+
PermissionType: ACLPermissionTypeAllow,
84+
},
85+
},
86+
},
87+
},
88+
}
89+
90+
assert.Equal(t, expectedDeleteResp, *deleteResp)
91+
92+
describeResp, err := client.DescribeACLs(context.Background(), &DescribeACLsRequest{
93+
Filter: ACLFilter{
94+
ResourceTypeFilter: ResourceTypeTopic,
95+
ResourceNameFilter: topic,
96+
ResourcePatternTypeFilter: PatternTypeLiteral,
97+
Operation: ACLOperationTypeRead,
98+
PermissionType: ACLPermissionTypeAllow,
99+
},
100+
})
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
105+
expectedDescribeResp := DescribeACLsResponse{
106+
Throttle: 0,
107+
Error: makeError(0, ""),
108+
Resources: []ACLResource{},
109+
}
110+
111+
assert.Equal(t, expectedDescribeResp, *describeResp)
112+
}

protocol/deleteacls/deleteacls.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package deleteacls
2+
3+
import "github.com/segmentio/kafka-go/protocol"
4+
5+
func init() {
6+
protocol.Register(&Request{}, &Response{})
7+
}
8+
9+
type Request struct {
10+
// We need at least one tagged field to indicate that v2+ uses "flexible"
11+
// messages.
12+
_ struct{} `kafka:"min=v2,max=v3,tag"`
13+
14+
Filters []RequestFilter `kafka:"min=v0,max=v3"`
15+
}
16+
17+
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteAcls }
18+
19+
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
20+
return cluster.Brokers[cluster.Controller], nil
21+
}
22+
23+
type RequestFilter struct {
24+
// We need at least one tagged field to indicate that v2+ uses "flexible"
25+
// messages.
26+
_ struct{} `kafka:"min=v2,max=v3,tag"`
27+
28+
ResourceTypeFilter int8 `kafka:"min=v0,max=v3"`
29+
ResourceNameFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
30+
ResourcePatternTypeFilter int8 `kafka:"min=v1,max=v3"`
31+
PrincipalFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
32+
HostFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
33+
Operation int8 `kafka:"min=v0,max=v3"`
34+
PermissionType int8 `kafka:"min=v0,max=v3"`
35+
}
36+
37+
type Response struct {
38+
// We need at least one tagged field to indicate that v2+ uses "flexible"
39+
// messages.
40+
_ struct{} `kafka:"min=v2,max=v3,tag"`
41+
42+
ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
43+
FilterResults []FilterResult `kafka:"min=v0,max=v3"`
44+
}
45+
46+
func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteAcls }
47+
48+
type FilterResult struct {
49+
// We need at least one tagged field to indicate that v2+ uses "flexible"
50+
// messages.
51+
_ struct{} `kafka:"min=v2,max=v3,tag"`
52+
53+
ErrorCode int16 `kafka:"min=v0,max=v3"`
54+
ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
55+
MatchingACLs []MatchingACL `kafka:"min=v0,max=v3"`
56+
}
57+
58+
type MatchingACL struct {
59+
// We need at least one tagged field to indicate that v2+ uses "flexible"
60+
// messages.
61+
_ struct{} `kafka:"min=v2,max=v3,tag"`
62+
63+
ErrorCode int16 `kafka:"min=v0,max=v3"`
64+
ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
65+
ResourceType int8 `kafka:"min=v0,max=v3"`
66+
ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
67+
ResourcePatternType int8 `kafka:"min=v1,max=v3"`
68+
Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
69+
Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
70+
Operation int8 `kafka:"min=v0,max=v3"`
71+
PermissionType int8 `kafka:"min=v0,max=v3"`
72+
}
73+
74+
var _ protocol.BrokerMessage = (*Request)(nil)

0 commit comments

Comments
 (0)