Skip to content

Commit 128d3ff

Browse files
committed
fix race where multiple workers' callbacks overwrite shared c.recValue field
1 parent 3ce6004 commit 128d3ff

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

internal/internal_flags.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const (
3434

3535
// unblockSelectorSignal exists to allow us to configure the default behavior of
3636
// SDKFlagBlockedSelectorSignalReceive. This is primarily useful with tests.
37-
var unblockSelectorSignal = os.Getenv("UNBLOCK_SIGNAL_SELECTOR") != ""
37+
var unblockSelectorSignal = os.Getenv("UNBLOCK_SIGNAL_SELECTOR") != "false"
3838

3939
func sdkFlagFromUint(value uint32) sdkFlag {
4040
switch value {

internal/internal_workflow.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,6 +1401,7 @@ func (s *selectorImpl) Select(ctx Context) {
14011401
if pair.receiveFunc != nil {
14021402
f := *pair.receiveFunc
14031403
c := pair.channel
1404+
hasDefault := s.defaultFunc != nil
14041405
callback := &receiveCallback{
14051406
fn: func(v interface{}, more bool) bool {
14061407
if readyBranch != nil {
@@ -1416,12 +1417,15 @@ func (s *selectorImpl) Select(ctx Context) {
14161417
dropSignalFlag = env.GetFlag(SDKFlagBlockedSelectorSignalReceive)
14171418
}
14181419

1419-
if dropSignalFlag {
1420+
// Only store value immediately when default branch exists,
1421+
// otherwise store in readyBranch to avoid race when multiple
1422+
// selectors blocked on same channel
1423+
if dropSignalFlag && hasDefault {
14201424
c.recValue = &v
14211425
}
14221426

14231427
readyBranch = func() {
1424-
if !dropSignalFlag {
1428+
if !dropSignalFlag || !hasDefault {
14251429
c.recValue = &v
14261430
}
14271431
f(c, more)

internal/internal_workflow_testsuite_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4250,3 +4250,65 @@ func (s *WorkflowTestSuiteUnitTest) Test_SignalNotLost() {
42504250
err := env.GetWorkflowError()
42514251
s.NoError(err)
42524252
}
4253+
4254+
func (s *WorkflowTestSuiteUnitTest) TestChannelWorkerPattern() {
4255+
previousFlag := unblockSelectorSignal
4256+
unblockSelectorSignal = true
4257+
defer func() { unblockSelectorSignal = previousFlag }()
4258+
4259+
require := s.Require()
4260+
4261+
// Two workers listening on the same channel with multiple items sent quickly.
4262+
// Without the fix, callbacks overwrite c.recValue causing items to be lost.
4263+
workflowFn := func(ctx Context) error {
4264+
ch := NewChannel(ctx)
4265+
var received []int
4266+
4267+
Go(ctx, func(ctx Context) {
4268+
ch.Send(ctx, 1)
4269+
ch.Send(ctx, 2)
4270+
ch.Send(ctx, 3)
4271+
ch.Close()
4272+
})
4273+
4274+
// Two workers both selecting on the same channel
4275+
wg := NewWaitGroup(ctx)
4276+
wg.Add(2)
4277+
for i := 0; i < 2; i++ {
4278+
Go(ctx, func(ctx Context) {
4279+
defer wg.Done()
4280+
for {
4281+
selector := NewSelector(ctx)
4282+
done := false
4283+
selector.AddReceive(ch, func(c ReceiveChannel, more bool) {
4284+
if more {
4285+
var v int
4286+
c.Receive(ctx, &v)
4287+
received = append(received, v)
4288+
} else {
4289+
done = true
4290+
}
4291+
})
4292+
selector.Select(ctx)
4293+
if done {
4294+
break
4295+
}
4296+
}
4297+
})
4298+
}
4299+
4300+
wg.Wait(ctx)
4301+
4302+
// Without the fix: callbacks overwrite c.recValue, causing items to be lost
4303+
// With the fix: each callback's value is stored in its own readyBranch closure
4304+
require.Len(received, 3, "Expected 3 items to be received")
4305+
require.ElementsMatch([]int{1, 2, 3}, received, "Expected all items to be received exactly once")
4306+
return nil
4307+
}
4308+
4309+
env := s.NewTestWorkflowEnvironment()
4310+
env.ExecuteWorkflow(workflowFn)
4311+
4312+
require.True(env.IsWorkflowCompleted())
4313+
require.NoError(env.GetWorkflowError())
4314+
}

0 commit comments

Comments
 (0)