Skip to content

Commit fcb5875

Browse files
authored
feat: added aws-sdk-go-v2 IAM Mechanism for MSK authentication (#937)
1 parent 174188e commit fcb5875

File tree

5 files changed

+441
-0
lines changed

5 files changed

+441
-0
lines changed

sasl/aws_msk_iam_v2/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# AWS MSK IAM V2
2+
3+
This extension provides a capability to get authenticated with [AWS Managed Apache Kafka](https://aws.amazon.com/msk/)
4+
through AWS IAM.
5+
6+
## How to use
7+
8+
This module is an extension for MSK users and thus this is isolated from `kafka-go` module.
9+
You can add this module to your dependency by running the command below.
10+
11+
```shell
12+
go get github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2
13+
```
14+
15+
You can use the `Mechanism` for SASL authentication, like below.
16+
17+
```go
18+
package main
19+
20+
import (
21+
"context"
22+
"crypto/tls"
23+
"time"
24+
25+
signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
26+
awsCfg "github.com/aws/aws-sdk-go-v2/config"
27+
"github.com/segmentio/kafka-go"
28+
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
29+
)
30+
31+
func main() {
32+
ctx := context.Background()
33+
34+
// using aws-sdk-go-v2
35+
// NOTE: address error properly
36+
37+
cfg, _ := awsCfg.LoadDefaultConfig(ctx)
38+
creds, _ := cfg.Credentials.Retrieve(ctx)
39+
m := &aws_msk_iam_v2.Mechanism{
40+
Signer: signer.NewSigner(),
41+
Credentials: creds,
42+
Region: "us-east-1",
43+
SignTime: time.Now(),
44+
Expiry: time.Minute * 5,
45+
}
46+
config := kafka.ReaderConfig{
47+
Brokers: []string{"https://localhost"},
48+
GroupID: "some-consumer-group",
49+
GroupTopics: []string{"some-topic"},
50+
Dialer: &kafka.Dialer{
51+
Timeout: 10 * time.Second,
52+
DualStack: true,
53+
SASLMechanism: m,
54+
TLS: &tls.Config{},
55+
},
56+
}
57+
}
58+
59+
60+
```

sasl/aws_msk_iam_v2/go.mod

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2
2+
3+
go 1.15
4+
5+
require (
6+
github.com/aws/aws-sdk-go-v2 v1.16.7
7+
github.com/aws/aws-sdk-go-v2/credentials v1.12.9
8+
github.com/segmentio/kafka-go v0.4.32
9+
github.com/stretchr/testify v1.7.1
10+
)

sasl/aws_msk_iam_v2/go.sum

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
github.com/aws/aws-sdk-go-v2 v1.16.7 h1:zfBwXus3u14OszRxGcqCDS4MfMCv10e8SMJ2r8Xm0Ns=
2+
github.com/aws/aws-sdk-go-v2 v1.16.7/go.mod h1:6CpKuLXg2w7If3ABZCl/qZ6rEgwtjZTn4eAf4RcEyuw=
3+
github.com/aws/aws-sdk-go-v2/credentials v1.12.9 h1:DloAJr0/jbvm0iVRFDFh8GlWxrOd9XKyX82U+dfVeZs=
4+
github.com/aws/aws-sdk-go-v2/credentials v1.12.9/go.mod h1:2Vavxl1qqQXJ8MUcQZTsIEW8cwenFCWYXtLRPba3L/o=
5+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.8/go.mod h1:oL1Q3KuCq1D4NykQnIvtRiBGLUXhcpY5pl6QZB2XEPU=
6+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.14/go.mod h1:kdjrMwHwrC3+FsKhNcCMJ7tUVj/8uSD5CZXeQ4wV6fM=
7+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.8/go.mod h1:ZIV8GYoC6WLBW5KGs+o4rsc65/ozd+eQ0L31XF5VDwk=
8+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.8/go.mod h1:rDVhIMAX9N2r8nWxDUlbubvvaFMnfsm+3jAV7q+rpM4=
9+
github.com/aws/aws-sdk-go-v2/service/sso v1.11.12/go.mod h1:MO4qguFjs3wPGcCSpQ7kOFTwRvb+eu+fn+1vKleGHUk=
10+
github.com/aws/aws-sdk-go-v2/service/sts v1.16.9/go.mod h1:O1IvkYxr+39hRf960Us6j0x1P8pDqhTX+oXM5kQNl/Y=
11+
github.com/aws/smithy-go v1.12.0 h1:gXpeZel/jPoWQ7OEmLIgCUnhkFftqNfwWUwAHSlp1v0=
12+
github.com/aws/smithy-go v1.12.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
13+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
14+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
15+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
16+
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
17+
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
18+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
19+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
20+
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
21+
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
22+
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
23+
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
24+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
25+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
26+
github.com/segmentio/kafka-go v0.4.32 h1:Ohr+9E+kDv/Ld2UPJN9hnKZRd2qgiqCmI8v2e1qlfLM=
27+
github.com/segmentio/kafka-go v0.4.32/go.mod h1:JAPPIiY3MQIwVHj64CWOP0LsFFfQ7H0w69kuoxnMIS0=
28+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
29+
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
30+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
31+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
32+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
33+
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
34+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
35+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
36+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
37+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
38+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
39+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
40+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
41+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
42+
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
43+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
44+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
45+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
46+
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
47+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
48+
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 h1:dbuHpmKjkDzSOMKAWl10QNlgaZUd3V1q99xc81tt2Kc=
49+
gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

sasl/aws_msk_iam_v2/msk_iam.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package aws_msk_iam_v2
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"net/http"
9+
"net/url"
10+
"runtime"
11+
"strconv"
12+
"strings"
13+
"time"
14+
15+
"github.com/aws/aws-sdk-go-v2/aws"
16+
signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
17+
"github.com/segmentio/kafka-go/sasl"
18+
)
19+
20+
const (
21+
// These constants come from https://github.com/aws/aws-msk-iam-auth#details and
22+
// https://github.com/aws/aws-msk-iam-auth/blob/main/src/main/java/software/amazon/msk/auth/iam/internals/AWS4SignedPayloadGenerator.java.
23+
signAction = "kafka-cluster:Connect"
24+
signPayload = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" // the hex encoded SHA-256 of an empty string
25+
signService = "kafka-cluster"
26+
signVersion = "2020_10_22"
27+
signActionKey = "action"
28+
signHostKey = "host"
29+
signUserAgentKey = "user-agent"
30+
signVersionKey = "version"
31+
queryActionKey = "Action"
32+
queryExpiryKey = "X-Amz-Expires"
33+
)
34+
35+
var signUserAgent = fmt.Sprintf("kafka-go/sasl/aws_msk_iam/%s", runtime.Version())
36+
37+
// Mechanism implements sasl.Mechanism for the AWS_MSK_IAM mechanism, based on the official java implementation:
38+
// https://github.com/aws/aws-msk-iam-auth
39+
type Mechanism struct {
40+
// The sigv4.Signer of aws-sdk-go-v2 to use when signing the request. Required.
41+
Signer *signer.Signer
42+
// The aws.Credentials of aws-sdk-go-v2. Required.
43+
Credentials aws.Credentials
44+
// The region where the msk cluster is hosted, e.g. "us-east-1". Required.
45+
Region string
46+
// The time the request is planned for. Optional, defaults to time.Now() at time of authentication.
47+
SignTime time.Time
48+
// The duration for which the presigned request is active. Optional, defaults to 5 minutes.
49+
Expiry time.Duration
50+
}
51+
52+
func (m *Mechanism) Name() string {
53+
return "AWS_MSK_IAM"
54+
}
55+
56+
func (m *Mechanism) Next(ctx context.Context, challenge []byte) (bool, []byte, error) {
57+
// After the initial step, the authentication is complete
58+
// kafka will return error if it rejected the credentials, so we'll only
59+
// arrive here on success.
60+
return true, nil, nil
61+
}
62+
63+
// Start produces the authentication values required for AWS_MSK_IAM. It produces the following json as a byte array,
64+
// making use of the aws-sdk to produce the signed output.
65+
// {
66+
// "version" : "2020_10_22",
67+
// "host" : "<broker host>",
68+
// "user-agent": "<user agent string from the client>",
69+
// "action": "kafka-cluster:Connect",
70+
// "x-amz-algorithm" : "<algorithm>",
71+
// "x-amz-credential" : "<clientAWSAccessKeyID>/<date in yyyyMMdd format>/<region>/kafka-cluster/aws4_request",
72+
// "x-amz-date" : "<timestamp in yyyyMMdd'T'HHmmss'Z' format>",
73+
// "x-amz-security-token" : "<clientAWSSessionToken if any>",
74+
// "x-amz-signedheaders" : "host",
75+
// "x-amz-expires" : "<expiration in seconds>",
76+
// "x-amz-signature" : "<AWS SigV4 signature computed by the client>"
77+
// }
78+
func (m *Mechanism) Start(ctx context.Context) (sess sasl.StateMachine, ir []byte, err error) {
79+
signedMap, err := m.preSign(ctx)
80+
if err != nil {
81+
return nil, nil, err
82+
}
83+
84+
signedJson, err := json.Marshal(signedMap)
85+
return m, signedJson, err
86+
}
87+
88+
// preSign produces the authentication values required for AWS_MSK_IAM.
89+
func (m *Mechanism) preSign(ctx context.Context) (map[string]string, error) {
90+
req, err := buildReq(ctx, defaultExpiry(m.Expiry))
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
signedUrl, header, err := m.Signer.PresignHTTP(ctx, m.Credentials, req, signPayload, signService, m.Region, defaultSignTime(m.SignTime))
96+
if err != nil {
97+
return nil, err
98+
}
99+
100+
u, err := url.Parse(signedUrl)
101+
if err != nil {
102+
return nil, err
103+
}
104+
return buildSignedMap(u, header), nil
105+
}
106+
107+
// buildReq builds http.Request for aws PreSign.
108+
func buildReq(ctx context.Context, expiry time.Duration) (*http.Request, error) {
109+
query := url.Values{
110+
queryActionKey: {signAction},
111+
queryExpiryKey: {strconv.FormatInt(int64(expiry/time.Second), 10)},
112+
}
113+
saslMeta := sasl.MetadataFromContext(ctx)
114+
if saslMeta == nil {
115+
return nil, errors.New("missing sasl metadata")
116+
}
117+
118+
signUrl := url.URL{
119+
Scheme: "kafka",
120+
Host: saslMeta.Host,
121+
Path: "/",
122+
RawQuery: query.Encode(),
123+
}
124+
125+
req, err := http.NewRequest(http.MethodGet, signUrl.String(), nil)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
return req, nil
131+
}
132+
133+
// buildSignedMap builds signed string map which will be used to authenticate with MSK.
134+
func buildSignedMap(u *url.URL, header http.Header) map[string]string {
135+
signedMap := map[string]string{
136+
signVersionKey: signVersion,
137+
signHostKey: u.Host,
138+
signUserAgentKey: signUserAgent,
139+
signActionKey: signAction,
140+
}
141+
// The protocol requires lowercase keys.
142+
for key, vals := range header {
143+
signedMap[strings.ToLower(key)] = vals[0]
144+
}
145+
for key, vals := range u.Query() {
146+
signedMap[strings.ToLower(key)] = vals[0]
147+
}
148+
149+
return signedMap
150+
}
151+
152+
// defaultExpiry set default expiration time if user doesn't define Mechanism.Expiry.
153+
func defaultExpiry(v time.Duration) time.Duration {
154+
if v == 0 {
155+
return 5 * time.Minute
156+
}
157+
return v
158+
}
159+
160+
// defaultSignTime set default sign time if user doesn't define Mechanism.SignTime.
161+
func defaultSignTime(v time.Time) time.Time {
162+
if v.IsZero() {
163+
return time.Now()
164+
}
165+
return v
166+
}

0 commit comments

Comments
 (0)