Skip to content

Commit 8f60450

Browse files
feat: Kafka 4.0 support (#1384)
* Kafka 4.0 protocol fields updated * dynamic versions support * review fixes * fix scram sasl deployment for kafka 4.0 * fix indentation * fix some tests * add docker_compose_versions * add KAFKA_HEAP_OPTS to stop OOM errors * try running kafka 400 last to see if that avoids OOM errors * try adding G1GC to reduce out of memory errors from kafka * disable kafka 4.0.0 for now * fix typo and add documentation on Kafka 4.0 issues --------- Co-authored-by: maxwolf8852 <[email protected]>
1 parent ab44858 commit 8f60450

20 files changed

+445
-184
lines changed

.circleci/config.yml

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,58 @@ jobs:
178178
entrypoint: *entrypoint
179179
steps: *steps
180180

181+
# NOTE: this fails quite often due to Java heap errors from Kafka.
182+
# Once we figure out how to fix that, we can re-enable this.
183+
# https://github.com/segmentio/kafka-go/issues/1360#issuecomment-2858935900
184+
# kafka-400:
185+
# working_directory: *working_directory
186+
# environment:
187+
# KAFKA_VERSION: "4.0.0"
188+
189+
# # Need to skip nettest to avoid these kinds of errors:
190+
# # --- FAIL: TestConn/nettest (17.56s)
191+
# # --- FAIL: TestConn/nettest/PingPong (7.40s)
192+
# # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
193+
# # conntest.go:118: mismatching value: got 77, want 78
194+
# # conntest.go:118: mismatching value: got 78, want 79
195+
# # ...
196+
# #
197+
# # TODO: Figure out why these are happening and fix them (they don't appear to be new).
198+
# KAFKA_SKIP_NETTEST: "1"
199+
# docker:
200+
# - image: circleci/golang
201+
# - image: bitnami/kafka:4.0.0
202+
# ports:
203+
# - 9092:9092
204+
# - 9093:9093
205+
# environment:
206+
# KAFKA_CFG_NODE_ID: 1
207+
# KAFKA_CFG_BROKER_ID: 1
208+
# KAFKA_CFG_PROCESS_ROLES: broker,controller
209+
# KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
210+
# KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
211+
# KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT
212+
# KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093
213+
# KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093
214+
# KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN
215+
# KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
216+
# KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
217+
# ALLOW_PLAINTEXT_LISTENER: yes
218+
# KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
219+
# KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
220+
# KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
221+
# KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
222+
# KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
223+
# KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer'
224+
# KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain
225+
# KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain
226+
# KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret
227+
# KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
228+
# KAFKA_INTER_BROKER_USER: adminscram512
229+
# KAFKA_INTER_BROKER_PASSWORD: admin-secret-512
230+
# KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
231+
# steps: *steps
232+
181233
workflows:
182234
version: 2
183235
run:
@@ -186,4 +238,5 @@ workflows:
186238
- kafka-010
187239
- kafka-270
188240
- kafka-281
189-
- kafka-370
241+
- kafka-370
242+
#- kafka-400

alterclientquotas_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
ktesting "github.com/segmentio/kafka-go/testing"
89
"github.com/stretchr/testify/assert"
@@ -65,6 +66,11 @@ func TestClientAlterClientQuotas(t *testing.T) {
6566

6667
assert.Equal(t, expectedAlterResp, *alterResp)
6768

69+
// kraft mode is slow
70+
if ktesting.KafkaIsAtLeast("3.7.0") {
71+
time.Sleep(3 * time.Second)
72+
}
73+
6874
describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
6975
Components: []DescribeClientQuotasRequestComponent{
7076
{

alterpartitionreassignments_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kafka
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
ktesting "github.com/segmentio/kafka-go/testing"
89
)
@@ -35,6 +36,7 @@ func TestClientAlterPartitionReassignments(t *testing.T) {
3536
BrokerIDs: []int{1},
3637
},
3738
},
39+
Timeout: 5 * time.Second,
3840
},
3941
)
4042

@@ -96,6 +98,7 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) {
9698
BrokerIDs: []int{1},
9799
},
98100
},
101+
Timeout: 5 * time.Second,
99102
},
100103
)
101104

conn.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) {
306306

307307
// DeleteTopics deletes the specified topics.
308308
func (c *Conn) DeleteTopics(topics ...string) error {
309-
_, err := c.deleteTopics(deleteTopicsRequestV0{
309+
_, err := c.deleteTopics(deleteTopicsRequest{
310310
Topics: topics,
311311
})
312312
return err
@@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error
368368
// joinGroup attempts to join a consumer group
369369
//
370370
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
371-
func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
372-
var response joinGroupResponseV1
371+
func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) {
372+
version, err := c.negotiateVersion(joinGroup, v1, v2)
373+
if err != nil {
374+
return joinGroupResponse{}, err
375+
}
373376

374-
err := c.writeOperation(
377+
response := joinGroupResponse{v: version}
378+
379+
err = c.writeOperation(
375380
func(deadline time.Time, id int32) error {
376-
return c.writeRequest(joinGroup, v1, id, request)
381+
return c.writeRequest(joinGroup, version, id, request)
377382
},
378383
func(deadline time.Time, size int) error {
379384
return expectZeroSize(func() (remain int, err error) {
@@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
382387
},
383388
)
384389
if err != nil {
385-
return joinGroupResponseV1{}, err
390+
return joinGroupResponse{}, err
386391
}
387392
if response.ErrorCode != 0 {
388-
return joinGroupResponseV1{}, Error(response.ErrorCode)
393+
return joinGroupResponse{}, Error(response.ErrorCode)
389394
}
390395

391396
return response, nil

conn_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ import (
1313
"testing"
1414
"time"
1515

16-
ktesting "github.com/segmentio/kafka-go/testing"
1716
"golang.org/x/net/nettest"
17+
18+
ktesting "github.com/segmentio/kafka-go/testing"
1819
)
1920

2021
type timeout struct{}
@@ -679,10 +680,10 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
679680
func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) {
680681
waitForCoordinator(t, conn, groupID)
681682

682-
join := func() (joinGroup joinGroupResponseV1) {
683+
join := func() (joinGroup joinGroupResponse) {
683684
var err error
684685
for attempt := 0; attempt < 10; attempt++ {
685-
joinGroup, err = conn.joinGroup(joinGroupRequestV1{
686+
joinGroup, err = conn.joinGroup(joinGroupRequest{
686687
GroupID: groupID,
687688
SessionTimeout: int32(time.Minute / time.Millisecond),
688689
RebalanceTimeout: int32(time.Second / time.Millisecond),
@@ -770,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
770771
}
771772

772773
func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) {
773-
_, err := conn.joinGroup(joinGroupRequestV1{})
774+
_, err := conn.joinGroup(joinGroupRequest{})
774775
if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) {
775776
t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err)
776777
}
@@ -780,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) {
780781
groupID := makeGroupID()
781782
waitForCoordinator(t, conn, groupID)
782783

783-
_, err := conn.joinGroup(joinGroupRequestV1{
784+
_, err := conn.joinGroup(joinGroupRequest{
784785
GroupID: groupID,
785786
})
786787
if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) {
@@ -792,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) {
792793
groupID := makeGroupID()
793794
waitForCoordinator(t, conn, groupID)
794795

795-
_, err := conn.joinGroup(joinGroupRequestV1{
796+
_, err := conn.joinGroup(joinGroupRequest{
796797
GroupID: groupID,
797798
SessionTimeout: int32(3 * time.Second / time.Millisecond),
798799
})

consumergroup.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
555555
type coordinator interface {
556556
io.Closer
557557
findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
558-
joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
558+
joinGroup(joinGroupRequest) (joinGroupResponse, error)
559559
syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
560560
leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
561561
heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find
588588
return t.conn.findCoordinator(req)
589589
}
590590

591-
func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
591+
func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
592592
// in the case of join group, the consumer group coordinator may wait up
593593
// to rebalance timeout in order to wait for all members to join.
594594
if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
595-
return joinGroupResponseV1{}, err
595+
return joinGroupResponse{}, err
596596
}
597597
return t.conn.joinGroup(req)
598598
}
@@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
932932
// * InvalidSessionTimeout:
933933
// * GroupAuthorizationFailed:
934934
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
935-
request, err := cg.makeJoinGroupRequestV1(memberID)
935+
request, err := cg.makeJoinGroupRequest(memberID)
936936
if err != nil {
937937
return "", 0, nil, err
938938
}
@@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
978978

979979
// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
980980
// request.
981-
func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
982-
request := joinGroupRequestV1{
981+
func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) {
982+
request := joinGroupRequest{
983983
GroupID: cg.config.ID,
984984
MemberID: memberID,
985985
SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
@@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
990990
for _, balancer := range cg.config.GroupBalancers {
991991
userData, err := balancer.UserData()
992992
if err != nil {
993-
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
993+
return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
994994
}
995995
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
996996
ProtocolName: balancer.ProtocolName(),
@@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
10071007

10081008
// assignTopicPartitions uses the selected GroupBalancer to assign members to
10091009
// their various partitions.
1010-
func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
1010+
func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) {
10111011
cg.withLogger(func(l Logger) {
10121012
l.Printf("selected as leader for group, %s\n", cg.config.ID)
10131013
})
@@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
10501050
}
10511051

10521052
// makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
1053-
func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
1053+
func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) {
10541054
members := make([]GroupMember, 0, len(in))
10551055
for _, item := range in {
10561056
metadata := groupMetadata{}

0 commit comments

Comments
 (0)