Skip to content

Commit c03923d

Browse files
Merge pull request #752 from segmentio/yolken-fix-group-protocol
Fix member metadata decoding and remove deprecated describeGroups API
2 parents 1ca9e66 + 4d75f82 commit c03923d

File tree

5 files changed

+67
-268
lines changed

5 files changed

+67
-268
lines changed

conn.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -315,34 +315,6 @@ func (c *Conn) DeleteTopics(topics ...string) error {
315315
return err
316316
}
317317

318-
// describeGroups retrieves the specified groups
319-
//
320-
// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
321-
func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsResponseV0, error) {
322-
var response describeGroupsResponseV0
323-
324-
err := c.readOperation(
325-
func(deadline time.Time, id int32) error {
326-
return c.writeRequest(describeGroups, v0, id, request)
327-
},
328-
func(deadline time.Time, size int) error {
329-
return expectZeroSize(func() (remain int, err error) {
330-
return (&response).readFrom(&c.rbuf, size)
331-
}())
332-
},
333-
)
334-
if err != nil {
335-
return describeGroupsResponseV0{}, err
336-
}
337-
for _, group := range response.Groups {
338-
if group.ErrorCode != 0 {
339-
return describeGroupsResponseV0{}, Error(group.ErrorCode)
340-
}
341-
}
342-
343-
return response, nil
344-
}
345-
346318
// findCoordinator finds the coordinator for the specified group or transaction
347319
//
348320
// See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator

conn_test.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,6 @@ func TestConn(t *testing.T) {
201201
function: testConnReadBatchWithMaxWait,
202202
},
203203

204-
{
205-
scenario: "describe groups retrieves all groups when no groupID specified",
206-
function: testConnDescribeGroupRetrievesAllGroups,
207-
minVersion: "0.11.0",
208-
},
209-
210204
{
211205
scenario: "find the group coordinator",
212206
function: testConnFindCoordinator,
@@ -735,26 +729,6 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
735729
return
736730
}
737731

738-
func testConnDescribeGroupRetrievesAllGroups(t *testing.T, conn *Conn) {
739-
groupID := makeGroupID()
740-
_, _, stop1 := createGroup(t, conn, groupID)
741-
defer stop1()
742-
743-
out, err := conn.describeGroups(describeGroupsRequestV0{
744-
GroupIDs: []string{groupID},
745-
})
746-
if err != nil {
747-
t.Fatalf("bad describeGroups: %s", err)
748-
}
749-
750-
if v := len(out.Groups); v != 1 {
751-
t.Fatalf("expected 1 group, got %v", v)
752-
}
753-
if id := out.Groups[0].GroupID; id != groupID {
754-
t.Errorf("bad group: got %v, expected %v", id, groupID)
755-
}
756-
}
757-
758732
func testConnFindCoordinator(t *testing.T, conn *Conn) {
759733
groupID := makeGroupID()
760734

describegroups.go

Lines changed: 47 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ type DescribeGroupsResponseMemberMetadata struct {
6969

7070
// UserData is the user data for the member.
7171
UserData []byte
72+
73+
// OwnedPartitions contains the partitions owned by this group member; only set if
74+
// consumers are using a cooperative rebalancing assignor protocol.
75+
OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition
76+
}
77+
78+
type DescribeGroupsResponseMemberMetadataOwnedPartition struct {
79+
// Topic is the name of the topic.
80+
Topic string
81+
82+
// Partitions is the partitions that are owned by the group in the topic.
83+
Partitions []int
7284
}
7385

7486
// GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
@@ -93,6 +105,9 @@ type GroupMemberTopic struct {
93105
Partitions []int
94106
}
95107

108+
// DescribeGroups calls the Kafka DescribeGroups API to get information about one or more
109+
// consumer groups. See https://kafka.apache.org/protocol#The_Messages_DescribeGroups for
110+
// more information.
96111
func (c *Client) DescribeGroups(
97112
ctx context.Context,
98113
req *DescribeGroupsRequest,
@@ -141,176 +156,18 @@ func (c *Client) DescribeGroups(
141156
return resp, nil
142157
}
143158

144-
// See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
145-
//
146-
// TODO: Remove everything below and use protocol-based version above everywhere.
147-
type describeGroupsRequestV0 struct {
148-
// List of groupIds to request metadata for (an empty groupId array
149-
// will return empty group metadata).
150-
GroupIDs []string
151-
}
152-
153-
func (t describeGroupsRequestV0) size() int32 {
154-
return sizeofStringArray(t.GroupIDs)
155-
}
156-
157-
func (t describeGroupsRequestV0) writeTo(wb *writeBuffer) {
158-
wb.writeStringArray(t.GroupIDs)
159-
}
160-
161-
type describeGroupsResponseMemberV0 struct {
162-
// MemberID assigned by the group coordinator
163-
MemberID string
164-
165-
// ClientID used in the member's latest join group request
166-
ClientID string
167-
168-
// ClientHost used in the request session corresponding to the member's
169-
// join group.
170-
ClientHost string
171-
172-
// MemberMetadata the metadata corresponding to the current group protocol
173-
// in use (will only be present if the group is stable).
174-
MemberMetadata []byte
175-
176-
// MemberAssignments provided by the group leader (will only be present if
177-
// the group is stable).
178-
//
179-
// See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
180-
MemberAssignments []byte
181-
}
182-
183-
func (t describeGroupsResponseMemberV0) size() int32 {
184-
return sizeofString(t.MemberID) +
185-
sizeofString(t.ClientID) +
186-
sizeofString(t.ClientHost) +
187-
sizeofBytes(t.MemberMetadata) +
188-
sizeofBytes(t.MemberAssignments)
189-
}
190-
191-
func (t describeGroupsResponseMemberV0) writeTo(wb *writeBuffer) {
192-
wb.writeString(t.MemberID)
193-
wb.writeString(t.ClientID)
194-
wb.writeString(t.ClientHost)
195-
wb.writeBytes(t.MemberMetadata)
196-
wb.writeBytes(t.MemberAssignments)
197-
}
198-
199-
func (t *describeGroupsResponseMemberV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
200-
if remain, err = readString(r, size, &t.MemberID); err != nil {
201-
return
202-
}
203-
if remain, err = readString(r, remain, &t.ClientID); err != nil {
204-
return
205-
}
206-
if remain, err = readString(r, remain, &t.ClientHost); err != nil {
207-
return
208-
}
209-
if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
210-
return
211-
}
212-
if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
159+
// readFrom decodes an owned partition item from the member metadata.
160+
func (t *DescribeGroupsResponseMemberMetadataOwnedPartition) readFrom(r *bufio.Reader, size int) (remain int, err error) {
161+
if remain, err = readString(r, size, &t.Topic); err != nil {
213162
return
214163
}
215-
return
216-
}
217-
218-
type describeGroupsResponseGroupV0 struct {
219-
// ErrorCode holds response error code
220-
ErrorCode int16
221-
222-
// GroupID holds the unique group identifier
223-
GroupID string
224-
225-
// State holds current state of the group (one of: Dead, Stable, AwaitingSync,
226-
// PreparingRebalance, or empty if there is no active group)
227-
State string
228-
229-
// ProtocolType holds the current group protocol type (will be empty if there is
230-
// no active group)
231-
ProtocolType string
232-
233-
// Protocol holds the current group protocol (only provided if the group is Stable)
234-
Protocol string
235-
236-
// Members contains the current group members (only provided if the group is not Dead)
237-
Members []describeGroupsResponseMemberV0
238-
}
239-
240-
func (t describeGroupsResponseGroupV0) size() int32 {
241-
return sizeofInt16(t.ErrorCode) +
242-
sizeofString(t.GroupID) +
243-
sizeofString(t.State) +
244-
sizeofString(t.ProtocolType) +
245-
sizeofString(t.Protocol) +
246-
sizeofArray(len(t.Members), func(i int) int32 { return t.Members[i].size() })
247-
}
164+
partitions := []int32{}
248165

249-
func (t describeGroupsResponseGroupV0) writeTo(wb *writeBuffer) {
250-
wb.writeInt16(t.ErrorCode)
251-
wb.writeString(t.GroupID)
252-
wb.writeString(t.State)
253-
wb.writeString(t.ProtocolType)
254-
wb.writeString(t.Protocol)
255-
wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
256-
}
257-
258-
func (t *describeGroupsResponseGroupV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
259-
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
260-
return
261-
}
262-
if remain, err = readString(r, remain, &t.GroupID); err != nil {
263-
return
264-
}
265-
if remain, err = readString(r, remain, &t.State); err != nil {
266-
return
267-
}
268-
if remain, err = readString(r, remain, &t.ProtocolType); err != nil {
269-
return
270-
}
271-
if remain, err = readString(r, remain, &t.Protocol); err != nil {
272-
return
273-
}
274-
275-
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
276-
item := describeGroupsResponseMemberV0{}
277-
if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
278-
return
279-
}
280-
t.Members = append(t.Members, item)
166+
if remain, err = readInt32Array(r, remain, &partitions); err != nil {
281167
return
282168
}
283-
if remain, err = readArrayWith(r, remain, fn); err != nil {
284-
return
285-
}
286-
287-
return
288-
}
289-
290-
type describeGroupsResponseV0 struct {
291-
// Groups holds selected group information
292-
Groups []describeGroupsResponseGroupV0
293-
}
294-
295-
func (t describeGroupsResponseV0) size() int32 {
296-
return sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() })
297-
}
298-
299-
func (t describeGroupsResponseV0) writeTo(wb *writeBuffer) {
300-
wb.writeArray(len(t.Groups), func(i int) { t.Groups[i].writeTo(wb) })
301-
}
302-
303-
func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
304-
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
305-
item := describeGroupsResponseGroupV0{}
306-
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
307-
return
308-
}
309-
t.Groups = append(t.Groups, item)
310-
return
311-
}
312-
if remain, err = readArrayWith(r, sz, fn); err != nil {
313-
return
169+
for _, partition := range partitions {
170+
t.Partitions = append(t.Partitions, int(partition))
314171
}
315172

316173
return
@@ -347,6 +204,31 @@ func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetad
347204
return mm, err
348205
}
349206

207+
if mm.Version == 1 && remain > 0 {
208+
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
209+
op := DescribeGroupsResponseMemberMetadataOwnedPartition{}
210+
if fnRemain, fnErr = readString(r, size, &op.Topic); fnErr != nil {
211+
return
212+
}
213+
214+
ps := []int32{}
215+
if fnRemain, fnErr = readInt32Array(r, fnRemain, &ps); fnErr != nil {
216+
return
217+
}
218+
219+
for _, p := range ps {
220+
op.Partitions = append(op.Partitions, int(p))
221+
}
222+
223+
mm.OwnedPartitions = append(mm.OwnedPartitions, op)
224+
return
225+
}
226+
227+
if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
228+
return mm, err
229+
}
230+
}
231+
350232
if remain != 0 {
351233
return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
352234
}

describegroups_test.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package kafka
22

33
import (
4-
"bufio"
5-
"bytes"
64
"context"
75
"fmt"
86
"os"
@@ -12,48 +10,6 @@ import (
1210
"time"
1311
)
1412

15-
func TestDescribeGroupsResponseV0(t *testing.T) {
16-
item := describeGroupsResponseV0{
17-
Groups: []describeGroupsResponseGroupV0{
18-
{
19-
ErrorCode: 2,
20-
GroupID: "a",
21-
State: "b",
22-
ProtocolType: "c",
23-
Protocol: "d",
24-
Members: []describeGroupsResponseMemberV0{
25-
{
26-
MemberID: "e",
27-
ClientID: "f",
28-
ClientHost: "g",
29-
MemberMetadata: []byte("h"),
30-
MemberAssignments: []byte("i"),
31-
},
32-
},
33-
},
34-
},
35-
}
36-
37-
b := bytes.NewBuffer(nil)
38-
w := &writeBuffer{w: b}
39-
item.writeTo(w)
40-
41-
var found describeGroupsResponseV0
42-
remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
43-
if err != nil {
44-
t.Error(err)
45-
t.FailNow()
46-
}
47-
if remain != 0 {
48-
t.Errorf("expected 0 remain, got %v", remain)
49-
t.FailNow()
50-
}
51-
if !reflect.DeepEqual(item, found) {
52-
t.Error("expected item and found to be the same")
53-
t.FailNow()
54-
}
55-
}
56-
5713
func TestClientDescribeGroups(t *testing.T) {
5814
if os.Getenv("KAFKA_VERSION") == "2.3.1" {
5915
// There's a bug in 2.3.1 that causes the MemberMetadata to be in the wrong format and thus

0 commit comments

Comments
 (0)