Skip to content

Commit f4ca0b4

Browse files
Support describeacls (#1166)
* Support describeacls * gofmt -s -w createacl_test.go * make test diff smaller and fix protocl api key * fix another protocol api key * improve test name * protocol fixes * add missing patterntype * fix createacls protocol * fix tags and add tagged fields back in * bump createacls version to v3 * wip * just one filter, not a list of filters * add missing patterntype in test * fix patterntype location * add prototests * createacl_test.go -> createacls_test.go * seperate createacls_test and describeacls_test * fix describeaclstest * add comment for ResourcePatternTypeFilter
1 parent 6193fa9 commit f4ca0b4

File tree

7 files changed

+560
-18
lines changed

7 files changed

+560
-18
lines changed

createacl_test.go renamed to createacls_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@ func TestClientCreateACLs(t *testing.T) {
1515
client, shutdown := newLocalClient()
1616
defer shutdown()
1717

18-
res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
18+
topic := makeTopic()
19+
group := makeGroupID()
20+
21+
createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
1922
ACLs: []ACLEntry{
2023
{
2124
Principal: "User:alice",
2225
PermissionType: ACLPermissionTypeAllow,
2326
Operation: ACLOperationTypeRead,
2427
ResourceType: ResourceTypeTopic,
2528
ResourcePatternType: PatternTypeLiteral,
26-
ResourceName: "fake-topic-for-alice",
29+
ResourceName: topic,
2730
Host: "*",
2831
},
2932
{
@@ -32,7 +35,7 @@ func TestClientCreateACLs(t *testing.T) {
3235
Operation: ACLOperationTypeRead,
3336
ResourceType: ResourceTypeGroup,
3437
ResourcePatternType: PatternTypeLiteral,
35-
ResourceName: "fake-group-for-bob",
38+
ResourceName: group,
3639
Host: "*",
3740
},
3841
},
@@ -41,7 +44,7 @@ func TestClientCreateACLs(t *testing.T) {
4144
t.Fatal(err)
4245
}
4346

44-
for _, err := range res.Errors {
47+
for _, err := range createRes.Errors {
4548
if err != nil {
4649
t.Error(err)
4750
}

describeacls.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/protocol/describeacls"
10+
)
11+
12+
// DescribeACLsRequest represents a request sent to a kafka broker to describe
13+
// existing ACLs.
14+
type DescribeACLsRequest struct {
15+
// Address of the kafka broker to send the request to.
16+
Addr net.Addr
17+
18+
// Filter to filter ACLs on.
19+
Filter ACLFilter
20+
}
21+
22+
type ACLFilter struct {
23+
ResourceTypeFilter ResourceType
24+
ResourceNameFilter string
25+
// ResourcePatternTypeFilter was added in v1 and is not available prior to that.
26+
ResourcePatternTypeFilter PatternType
27+
PrincipalFilter string
28+
HostFilter string
29+
Operation ACLOperationType
30+
PermissionType ACLPermissionType
31+
}
32+
33+
// DescribeACLsResponse represents a response from a kafka broker to an ACL
34+
// describe request.
35+
type DescribeACLsResponse struct {
36+
// The amount of time that the broker throttled the request.
37+
Throttle time.Duration
38+
39+
// Error that occurred while attempting to describe
40+
// the ACLs.
41+
Error error
42+
43+
// ACL resources returned from the describe request.
44+
Resources []ACLResource
45+
}
46+
47+
type ACLResource struct {
48+
ResourceType ResourceType
49+
ResourceName string
50+
PatternType PatternType
51+
ACLs []ACLDescription
52+
}
53+
54+
type ACLDescription struct {
55+
Principal string
56+
Host string
57+
Operation ACLOperationType
58+
PermissionType ACLPermissionType
59+
}
60+
61+
func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) {
62+
m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{
63+
Filter: describeacls.ACLFilter{
64+
ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter),
65+
ResourceNameFilter: req.Filter.ResourceNameFilter,
66+
ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter),
67+
PrincipalFilter: req.Filter.PrincipalFilter,
68+
HostFilter: req.Filter.HostFilter,
69+
Operation: int8(req.Filter.Operation),
70+
PermissionType: int8(req.Filter.PermissionType),
71+
},
72+
})
73+
if err != nil {
74+
return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err)
75+
}
76+
77+
res := m.(*describeacls.Response)
78+
resources := make([]ACLResource, len(res.Resources))
79+
80+
for resourceIdx, respResource := range res.Resources {
81+
descriptions := make([]ACLDescription, len(respResource.ACLs))
82+
83+
for descriptionIdx, respDescription := range respResource.ACLs {
84+
descriptions[descriptionIdx] = ACLDescription{
85+
Principal: respDescription.Principal,
86+
Host: respDescription.Host,
87+
Operation: ACLOperationType(respDescription.Operation),
88+
PermissionType: ACLPermissionType(respDescription.PermissionType),
89+
}
90+
}
91+
92+
resources[resourceIdx] = ACLResource{
93+
ResourceType: ResourceType(respResource.ResourceType),
94+
ResourceName: respResource.ResourceName,
95+
PatternType: PatternType(respResource.PatternType),
96+
ACLs: descriptions,
97+
}
98+
}
99+
100+
ret := &DescribeACLsResponse{
101+
Throttle: makeDuration(res.ThrottleTimeMs),
102+
Error: makeError(res.ErrorCode, res.ErrorMessage),
103+
Resources: resources,
104+
}
105+
106+
return ret, nil
107+
}

describeacls_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
ktesting "github.com/segmentio/kafka-go/testing"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestClientDescribeACLs(t *testing.T) {
12+
if !ktesting.KafkaIsAtLeast("2.0.1") {
13+
return
14+
}
15+
16+
client, shutdown := newLocalClient()
17+
defer shutdown()
18+
19+
topic := makeTopic()
20+
group := makeGroupID()
21+
22+
createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
23+
ACLs: []ACLEntry{
24+
{
25+
Principal: "User:alice",
26+
PermissionType: ACLPermissionTypeAllow,
27+
Operation: ACLOperationTypeRead,
28+
ResourceType: ResourceTypeTopic,
29+
ResourcePatternType: PatternTypeLiteral,
30+
ResourceName: topic,
31+
Host: "*",
32+
},
33+
{
34+
Principal: "User:bob",
35+
PermissionType: ACLPermissionTypeAllow,
36+
Operation: ACLOperationTypeRead,
37+
ResourceType: ResourceTypeGroup,
38+
ResourcePatternType: PatternTypeLiteral,
39+
ResourceName: group,
40+
Host: "*",
41+
},
42+
},
43+
})
44+
if err != nil {
45+
t.Fatal(err)
46+
}
47+
48+
for _, err := range createRes.Errors {
49+
if err != nil {
50+
t.Error(err)
51+
}
52+
}
53+
54+
describeResp, err := client.DescribeACLs(context.Background(), &DescribeACLsRequest{
55+
Filter: ACLFilter{
56+
ResourceTypeFilter: ResourceTypeTopic,
57+
ResourceNameFilter: topic,
58+
ResourcePatternTypeFilter: PatternTypeLiteral,
59+
Operation: ACLOperationTypeRead,
60+
PermissionType: ACLPermissionTypeAllow,
61+
},
62+
})
63+
if err != nil {
64+
t.Fatal(err)
65+
}
66+
67+
expectedDescribeResp := DescribeACLsResponse{
68+
Throttle: 0,
69+
Error: makeError(0, ""),
70+
Resources: []ACLResource{
71+
{
72+
ResourceType: ResourceTypeTopic,
73+
ResourceName: topic,
74+
PatternType: PatternTypeLiteral,
75+
ACLs: []ACLDescription{
76+
{
77+
Principal: "User:alice",
78+
Host: "*",
79+
Operation: ACLOperationTypeRead,
80+
PermissionType: ACLPermissionTypeAllow,
81+
},
82+
},
83+
},
84+
},
85+
}
86+
87+
assert.Equal(t, expectedDescribeResp, *describeResp)
88+
}

protocol/createacls/createacls.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ func init() {
99
type Request struct {
1010
// We need at least one tagged field to indicate that v2+ uses "flexible"
1111
// messages.
12-
_ struct{} `kafka:"min=v2,max=v2,tag"`
12+
_ struct{} `kafka:"min=v2,max=v3,tag"`
1313

14-
Creations []RequestACLs `kafka:"min=v0,max=v2"`
14+
Creations []RequestACLs `kafka:"min=v0,max=v3"`
1515
}
1616

1717
func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls }
@@ -21,29 +21,37 @@ func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
2121
}
2222

2323
type RequestACLs struct {
24-
ResourceType int8 `kafka:"min=v0,max=v2"`
25-
ResourceName string `kafka:"min=v0,max=v2"`
26-
ResourcePatternType int8 `kafka:"min=v0,max=v2"`
27-
Principal string `kafka:"min=v0,max=v2"`
28-
Host string `kafka:"min=v0,max=v2"`
29-
Operation int8 `kafka:"min=v0,max=v2"`
30-
PermissionType int8 `kafka:"min=v0,max=v2"`
24+
// We need at least one tagged field to indicate that v2+ uses "flexible"
25+
// messages.
26+
_ struct{} `kafka:"min=v2,max=v3,tag"`
27+
28+
ResourceType int8 `kafka:"min=v0,max=v3"`
29+
ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
30+
ResourcePatternType int8 `kafka:"min=v1,max=v3"`
31+
Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
32+
Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"`
33+
Operation int8 `kafka:"min=v0,max=v3"`
34+
PermissionType int8 `kafka:"min=v0,max=v3"`
3135
}
3236

3337
type Response struct {
3438
// We need at least one tagged field to indicate that v2+ uses "flexible"
3539
// messages.
36-
_ struct{} `kafka:"min=v2,max=v2,tag"`
40+
_ struct{} `kafka:"min=v2,max=v3,tag"`
3741

38-
ThrottleTimeMs int32 `kafka:"min=v0,max=v2"`
39-
Results []ResponseACLs `kafka:"min=v0,max=v2"`
42+
ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
43+
Results []ResponseACLs `kafka:"min=v0,max=v3"`
4044
}
4145

4246
func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls }
4347

4448
type ResponseACLs struct {
45-
ErrorCode int16 `kafka:"min=v0,max=v2"`
46-
ErrorMessage string `kafka:"min=v0,max=v2,nullable"`
49+
// We need at least one tagged field to indicate that v2+ uses "flexible"
50+
// messages.
51+
_ struct{} `kafka:"min=v2,max=v3,tag"`
52+
53+
ErrorCode int16 `kafka:"min=v0,max=v3"`
54+
ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"`
4755
}
4856

4957
var _ protocol.BrokerMessage = (*Request)(nil)

0 commit comments

Comments
 (0)