Skip to content

Commit 11670d5

Browse files
author
Steve van Loben Sels
authored
Introduced top-level ConsumerGroup construct (segmentio#277)
This is a lower-level API that unlocks use cases such as consuming each assigned partition using a separate Reader or to overwrite offsets for a group. Refactored logic out of Reader and into ConsumerGroup so that Reader uses the ConsumerGroup when GroupID is set.
1 parent 18548e9 commit 11670d5

File tree

9 files changed

+2064
-1271
lines changed

9 files changed

+2064
-1271
lines changed

conn.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -496,10 +496,10 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1,
496496
return response, nil
497497
}
498498

499-
// syncGroups completes the handshake to join a consumer group
499+
// syncGroup completes the handshake to join a consumer group
500500
//
501501
// See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
502-
func (c *Conn) syncGroups(request syncGroupRequestV0) (syncGroupResponseV0, error) {
502+
func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error) {
503503
var response syncGroupResponseV0
504504

505505
err := c.readOperation(
@@ -767,7 +767,6 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
767767
id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
768768
now := time.Now()
769769
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
770-
adjustedDeadline = deadline
771770
switch c.fetchVersion {
772771
case v10:
773772
return c.wb.writeFetchRequestV10(

conn_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
597597
joinGroup := join()
598598

599599
// sync the group
600-
_, err := conn.syncGroups(syncGroupRequestV0{
600+
_, err := conn.syncGroup(syncGroupRequestV0{
601601
GroupID: groupID,
602602
GenerationID: joinGroup.GenerationID,
603603
MemberID: joinGroup.MemberID,
@@ -609,7 +609,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
609609
},
610610
})
611611
if err != nil {
612-
t.Fatalf("bad syncGroups: %s", err)
612+
t.Fatalf("bad syncGroup: %s", err)
613613
}
614614

615615
generationID = joinGroup.GenerationID
@@ -710,7 +710,7 @@ func testConnHeartbeatErr(t *testing.T, conn *Conn) {
710710
groupID := makeGroupID()
711711
createGroup(t, conn, groupID)
712712

713-
_, err := conn.syncGroups(syncGroupRequestV0{
713+
_, err := conn.syncGroup(syncGroupRequestV0{
714714
GroupID: groupID,
715715
})
716716
if err != UnknownMemberId && err != NotCoordinatorForGroup {
@@ -734,7 +734,7 @@ func testConnSyncGroupErr(t *testing.T, conn *Conn) {
734734
groupID := makeGroupID()
735735
waitForCoordinator(t, conn, groupID)
736736

737-
_, err := conn.syncGroups(syncGroupRequestV0{
737+
_, err := conn.syncGroup(syncGroupRequestV0{
738738
GroupID: groupID,
739739
})
740740
if err != UnknownMemberId && err != NotCoordinatorForGroup {
@@ -844,6 +844,7 @@ func testConnFetchAndCommitOffsets(t *testing.T, conn *Conn) {
844844
}
845845

846846
func testConnWriteReadConcurrently(t *testing.T, conn *Conn) {
847+
847848
const N = 1000
848849
var msgs = make([]string, N)
849850
var done = make(chan struct{})

0 commit comments

Comments
 (0)