Skip to content

Commit 7e5a296

Browse files
Fix describegroups in 2.3.1 case
1 parent a153940 commit 7e5a296

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

describegroups.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int
316316

317317
// decodeMemberMetadata converts raw metadata bytes to a
318318
// DescribeGroupsResponseMemberMetadata struct.
319+
//
320+
// See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
321+
// for protocol details.
319322
func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetadata, error) {
320323
mm := DescribeGroupsResponseMemberMetadata{}
321324

describegroups_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"bytes"
66
"context"
77
"fmt"
8+
"os"
89
"reflect"
910
"sort"
1011
"testing"
@@ -54,6 +55,14 @@ func TestDescribeGroupsResponseV0(t *testing.T) {
5455
}
5556

5657
func TestClientDescribeGroups(t *testing.T) {
58+
if os.Getenv("KAFKA_VERSION") == "2.3.1" {
59+
// There's a bug in 2.3.1 that causes the MemberMetadata to be in the wrong format and thus
60+
// leads to an error when decoding the DescribeGroupsResponse.
61+
//
62+
// See https://issues.apache.org/jira/browse/KAFKA-9150 for details.
63+
t.Skip("Skipping because kafka version is 2.3.1")
64+
}
65+
5766
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
5867
defer cancel()
5968

@@ -76,6 +85,7 @@ func TestClientDescribeGroups(t *testing.T) {
7685
Value: []byte("value"),
7786
},
7887
)
88+
7989
if err != nil {
8090
t.Fatal(err)
8191
}

0 commit comments

Comments
 (0)