Skip to content

Commit bc25f16

Browse files
Steve van Loben SelsAchille
Steve van Loben Sels
and
Achille
authored
Fix a rare panic during consumer group rebalance (#791)
* Fix a rare panic during consumer group rebalance Currently, there is a sync.WaitGroup that is used for process accounting with go routines started by Generation.Start. It is used when a generation is ending to ensure that all of the Start-ed functions exit before the next generation can start. There is an edge case that can cause panic as written up in #786 due to unsafe use of the WaitGroup. Basically, it is possible that close() is already in process and is waiting on WaitGroup.Wait() when Start() comes in and calls WaitGroup.Add(1). The contract of Start() is that the provided function is to be run. Code in the wild may depend on this behavior, so it's not an option to return from Start() without runing the provided function. Accordingly, this PR updates the code to coordinate between close() and Start() such that the panic case is no longer possible while preserving the existing contract. It uses channels and a mutex in order to create two cases: the normal case where the generation is alive and the edge case where the generation has already ended. * Update consumergroup.go Co-authored-by: Achille <[email protected]> Co-authored-by: Achille <[email protected]>
1 parent e88d48a commit bc25f16

File tree

2 files changed

+86
-11
lines changed

2 files changed

+86
-11
lines changed

consumergroup.go

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,17 @@ type Generation struct {
322322

323323
conn coordinator
324324

325-
once sync.Once
326-
done chan struct{}
327-
wg sync.WaitGroup
325+
// the following fields are used for process accounting to synchronize
326+
// between Start and close. lock protects all of them. done is closed
327+
// when the generation is ending in order to signal that the generation
328+
// should start self-desructing. closed protects against double-closing
329+
// the done chan. routines is a count of runing go routines that have been
330+
// launched by Start. joined will be closed by the last go routine to exit.
331+
lock sync.Mutex
332+
done chan struct{}
333+
closed bool
334+
routines int
335+
joined chan struct{}
328336

329337
retentionMillis int64
330338
log func(func(Logger))
@@ -334,10 +342,21 @@ type Generation struct {
334342
// close stops the generation and waits for all functions launched via Start to
335343
// terminate.
336344
func (g *Generation) close() {
337-
g.once.Do(func() {
345+
g.lock.Lock()
346+
if !g.closed {
338347
close(g.done)
339-
})
340-
g.wg.Wait()
348+
g.closed = true
349+
}
350+
// determine whether any go routines are running that we need to wait for.
351+
// waiting needs to happen outside of the critical section.
352+
r := g.routines
353+
g.lock.Unlock()
354+
355+
// NOTE: r will be zero if no go routines were ever launched. no need to
356+
// wait in that case.
357+
if r > 0 {
358+
<-g.joined
359+
}
341360
}
342361

343362
// Start launches the provided function in a go routine and adds accounting such
@@ -354,15 +373,42 @@ func (g *Generation) close() {
354373
// progress for this consumer and potentially cause consumer group membership
355374
// churn.
356375
func (g *Generation) Start(fn func(ctx context.Context)) {
357-
g.wg.Add(1)
376+
g.lock.Lock()
377+
defer g.lock.Unlock()
378+
379+
// this is an edge case: if the generation has already closed, then it's
380+
// possible that the close func has already waited on outstanding go
381+
// routines and exited.
382+
//
383+
// nonetheless, it's important to honor that the fn is invoked in case the
384+
// calling function is waiting e.g. on a channel send or a WaitGroup. in
385+
// such a case, fn should immediately exit because ctx.Err() will return
386+
// ErrGenerationEnded.
387+
if g.closed {
388+
go fn(genCtx{g})
389+
return
390+
}
391+
392+
// register that there is one more go routine that's part of this gen.
393+
g.routines++
394+
358395
go func() {
359396
fn(genCtx{g})
397+
g.lock.Lock()
360398
// shut down the generation as soon as one function exits. this is
361-
// different from close() in that it doesn't wait on the wg.
362-
g.once.Do(func() {
399+
// different from close() in that it doesn't wait for all go routines in
400+
// the generation to exit.
401+
if !g.closed {
363402
close(g.done)
364-
})
365-
g.wg.Done()
403+
g.closed = true
404+
}
405+
g.routines--
406+
// if this was the last go routine in the generation, close the joined
407+
// chan so that close() can exit if it's waiting.
408+
if g.routines == 0 {
409+
close(g.joined)
410+
}
411+
g.lock.Unlock()
366412
}()
367413
}
368414

@@ -781,6 +827,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
781827
Assignments: cg.makeAssignments(assignments, offsets),
782828
conn: conn,
783829
done: make(chan struct{}),
830+
joined: make(chan struct{}),
784831
retentionMillis: int64(cg.config.RetentionTime / time.Millisecond),
785832
log: cg.withLogger,
786833
logError: cg.withErrorLogger,

consumergroup_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,7 @@ func TestGenerationExitsOnPartitionChange(t *testing.T) {
630630
gen := Generation{
631631
conn: conn,
632632
done: make(chan struct{}),
633+
joined: make(chan struct{}),
633634
log: func(func(Logger)) {},
634635
logError: func(func(Logger)) {},
635636
}
@@ -649,3 +650,30 @@ func TestGenerationExitsOnPartitionChange(t *testing.T) {
649650
}
650651
}
651652
}
653+
654+
func TestGenerationStartsFunctionAfterClosed(t *testing.T) {
655+
gen := Generation{
656+
conn: &mockCoordinator{},
657+
done: make(chan struct{}),
658+
joined: make(chan struct{}),
659+
log: func(func(Logger)) {},
660+
logError: func(func(Logger)) {},
661+
}
662+
663+
gen.close()
664+
665+
ch := make(chan error)
666+
gen.Start(func(ctx context.Context) {
667+
<-ctx.Done()
668+
ch <- ctx.Err()
669+
})
670+
671+
select {
672+
case <-time.After(time.Second):
673+
t.Fatal("timed out waiting for func to run")
674+
case err := <-ch:
675+
if err != ErrGenerationEnded {
676+
t.Fatalf("expected %v but got %v", ErrGenerationEnded, err)
677+
}
678+
}
679+
}

0 commit comments

Comments
 (0)