Skip to content

Commit 2598358

Browse files
authored
pool: Enhance worker lifecycle management and job reliability (#51)
* Filter out stale events early Ensure two nodes can't process stale workers concurrently * Centralize worker stream cache Instead of each worker having their own cache, have the parent node hold the cache to avoid duplicate caching. * pool: fix cleanup of stale workers The original code used future timestamps in workerKeepAliveMap to prevent concurrent cleanup operations. This made stale workers appear active and could permanently prevent cleanup if a node crashed during the process. Fixed by: - Added dedicated cleanupMap to track workers being cleaned up - Implemented proper concurrency handling using SetIfNotExists/TestAndSet - Added retry logic with exponential backoff for requeuing jobs - Ensured cleanup map is properly closed during node shutdown - Updated worker.go to handle new processRequeuedJobs retry parameter The fix ensures stale workers and their jobs are reliably cleaned up even in case of node failures or concurrent cleanup attempts. * Properly ack stale event to avoid re-publish * fix(pool): prevent worker leaks from context cancellation - Use background context for worker goroutines to prevent premature termination - Preserve logging context while making worker lifecycle independent of caller - Rename maps for better clarity (e.g. jobsMap -> jobMap) - Improve node stream management with nodeStreams map - Clean up error handling and logging patterns This fixes an issue where workers could be leaked when the caller's context was cancelled before proper cleanup could occur. * refactor(pool): Ensure eventual consistency of job assignments Improves the Node component's ability to detect and reassign jobs from stale or deleted workers by: 1. Adding explicit orphaned job detection for workers missing keep-alive entries 2. Centralizing worker cleanup logic to ensure consistent job reassignment 3. Simplifying worker state validation to catch edge cases in distributed scenarios This ensures that no jobs are lost when workers become unavailable, maintaining eventual consistency of job assignments across the worker pool. * pool: improve worker cleanup and job requeuing reliability Enhances worker cleanup mechanism by handling stale cleanup locks and adding cleanup verification. Key changes: * Add detection and cleanup of stale worker cleanup locks * Clean up jobs from jobMap after successful requeue * Improve logging around worker cleanup and job requeuing * Upgrade requeue log level to Info for better operational visibility This improves reliability of the distributed job system by preventing orphaned jobs and stale locks from accumulating over time. * Prevent streams from being recreated after destroy * Prevent re-creation of worker streams * Add proper options to missed node stream add call * streaming: cleanup stale consumers during sink initialization Add cleanup of stale consumers during sink initialization to prevent accumulation of stale consumers in Redis. Previously stale consumers were only cleaned up periodically, which could lead to a buildup if sinks did not shut down cleanly. Also refactor the stale consumer cleanup logic to: 1. Extract common cleanup code into deleteStreamStaleConsumers 2. Improve error handling and logging 3. Properly clean up all related data structures (Redis consumer group, keep-alive map, and consumers map) * Add event ID to requeue log entries * pool: refactor worker cleanup logic Improve the worker cleanup implementation by: 1. Split cleanupWorker into smaller, focused functions: - acquireCleanupLock: handles cleanup lock management - requeueWorkerJobs: handles job requeuing - cleanupWorker: orchestrates the cleanup process 2. Simplify cleanupInactiveWorkers: - Use activeWorkers() to get list of active workers - Combine jobMap and workerMap checks into a single loop - Skip workers being actively cleaned up 3. Rename isActive to isWithinTTL to better reflect its purpose - Function checks if a timestamp is within TTL duration - Used consistently across node and worker cleanup * pool: Add periodic cleanup of stale pending jobs This commit adds a new background process to clean up stale entries in the pending jobs map. Previously, stale entries were only cleaned up when attempting to dispatch a job with the same key. Now, a dedicated goroutine runs at the ackGracePeriod frequency to proactively remove expired entries. Additional changes: - Fix jobPendingMap comment to clarify it's indexed by job key not worker ID - Add debug logs for worker shutdown in handleEvents and keepAlive - Refactor timestamp validation to use isWithinTTL helper across the codebase - Improve error handling in cleanupStalePendingJobs using TestAndDelete The periodic cleanup helps prevent memory leaks from abandoned dispatch attempts and makes the job dispatch system more reliable. * pool: simplify job requeuing logic Remove the requeueJob helper function and directly use dispatchJob for requeueing jobs during worker cleanup and rebalancing. * Properly delete stale workers with requeued jobs * Fix deadlock from blocked ichan notifications Non-read notifications on the ichan channel were blocking writes and causing deadlocks during Set operations. This commit removes ichan and replaces it with a waiter-based mechanism using dedicated per-call channels, ensuring notifications are delivered without blocking.
1 parent a19129a commit 2598358

File tree

10 files changed

+770
-409
lines changed

10 files changed

+770
-409
lines changed

pool/node.go

Lines changed: 389 additions & 272 deletions
Large diffs are not rendered by default.

pool/node_test.go

Lines changed: 117 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,10 @@ func TestDispatchJobRaceCondition(t *testing.T) {
325325

326326
// Set a stale pending timestamp
327327
staleTS := time.Now().Add(-time.Hour).UnixNano()
328-
_, err := node1.pendingJobsMap.SetAndWait(ctx, jobKey, strconv.FormatInt(staleTS, 10))
328+
_, err := node1.jobPendingMap.SetAndWait(ctx, jobKey, strconv.FormatInt(staleTS, 10))
329329
require.NoError(t, err, "Failed to set stale pending timestamp")
330330
defer func() {
331-
_, err = node1.pendingJobsMap.Delete(ctx, jobKey)
331+
_, err = node1.jobPendingMap.Delete(ctx, jobKey)
332332
assert.NoError(t, err, "Failed to delete pending timestamp")
333333
}()
334334

@@ -346,7 +346,7 @@ func TestDispatchJobRaceCondition(t *testing.T) {
346346
require.NoError(t, err, "Dispatch should succeed")
347347
// Verify pending entry was cleaned up
348348
require.Eventually(t, func() bool {
349-
val, exists := node1.pendingJobsMap.Get(jobKey)
349+
val, exists := node1.jobPendingMap.Get(jobKey)
350350
t.Logf("Got pending value: %q", val)
351351
return !exists
352352
}, max, delay, "Pending entry should be cleaned up after successful dispatch")
@@ -357,7 +357,7 @@ func TestDispatchJobRaceCondition(t *testing.T) {
357357
payload := []byte("test payload")
358358

359359
// Set an invalid pending timestamp
360-
_, err := node1.pendingJobsMap.SetAndWait(ctx, jobKey, "invalid-timestamp")
360+
_, err := node1.jobPendingMap.SetAndWait(ctx, jobKey, "invalid-timestamp")
361361
require.NoError(t, err, "Failed to set invalid pending timestamp")
362362

363363
// Dispatch should succeed (invalid timestamps are logged and ignored)
@@ -380,7 +380,7 @@ func TestDispatchJobRaceCondition(t *testing.T) {
380380

381381
// Verify pending entry was cleaned up
382382
require.Eventually(t, func() bool {
383-
_, exists := node1.pendingJobsMap.Get(jobKey)
383+
_, exists := node1.jobPendingMap.Get(jobKey)
384384
return !exists
385385
}, max, delay, "Pending entry should be cleaned up after failed dispatch")
386386
})
@@ -623,7 +623,7 @@ func TestAckWorkerEventWithMissingPendingEvent(t *testing.T) {
623623
}
624624

625625
// Call ackWorkerEvent with the mock event
626-
node.ackWorkerEvent(ctx, mockEvent)
626+
node.ackWorkerEvent(mockEvent)
627627

628628
// Verify that no panic occurred and the function completed successfully
629629
assert.True(t, true, "ackWorkerEvent should complete without panic")
@@ -681,7 +681,7 @@ func TestStaleEventsAreRemoved(t *testing.T) {
681681
node.pendingEvents.Store(pendingEventKey("worker", mockEventID), mockEvent)
682682

683683
// Call ackWorkerEvent to trigger the stale event cleanup
684-
node.ackWorkerEvent(ctx, mockEvent)
684+
node.ackWorkerEvent(mockEvent)
685685

686686
assert.Eventually(t, func() bool {
687687
_, ok := node.pendingEvents.Load(pendingEventKey("worker", staleEventID))
@@ -834,6 +834,116 @@ func TestShutdownStopsAllJobs(t *testing.T) {
834834
assert.Empty(t, worker2.Jobs(), "Worker2 should have no remaining jobs")
835835
}
836836

837+
func TestWorkerAckStreams(t *testing.T) {
838+
testName := strings.Replace(t.Name(), "/", "_", -1)
839+
ctx := ptesting.NewTestContext(t)
840+
rdb := ptesting.NewRedisClient(t)
841+
node := newTestNode(t, ctx, rdb, testName)
842+
defer ptesting.CleanupRedis(t, rdb, true, testName)
843+
844+
// Create a worker and dispatch a job
845+
worker := newTestWorker(t, ctx, node)
846+
require.NoError(t, node.DispatchJob(ctx, testName, []byte("payload")))
847+
848+
// Wait for the job to start and be acknowledged
849+
require.Eventually(t, func() bool {
850+
return len(worker.Jobs()) == 1
851+
}, max, delay)
852+
853+
// Verify stream is created and cached
854+
stream1, err := node.getNodeStream(node.ID)
855+
require.NoError(t, err)
856+
stream2, err := node.getNodeStream(node.ID)
857+
require.NoError(t, err)
858+
assert.Same(t, stream1, stream2, "Expected same stream instance to be returned")
859+
860+
// Verify stream exists before shutdown
861+
streamKey := "pulse:stream:" + nodeStreamName(testName, node.ID)
862+
exists, err := rdb.Exists(ctx, streamKey).Result()
863+
assert.NoError(t, err)
864+
assert.Equal(t, int64(1), exists, "Expected stream to exist before shutdown")
865+
866+
// Shutdown node
867+
assert.NoError(t, node.Shutdown(ctx))
868+
869+
// Verify stream is destroyed in Redis
870+
exists, err = rdb.Exists(ctx, streamKey).Result()
871+
assert.NoError(t, err)
872+
assert.Equal(t, int64(0), exists, "Expected stream to be destroyed after shutdown")
873+
}
874+
875+
func TestStaleWorkerCleanupAfterJobRequeue(t *testing.T) {
876+
// Setup test environment
877+
ctx := ptesting.NewTestContext(t)
878+
testName := strings.Replace(t.Name(), "/", "_", -1)
879+
rdb := ptesting.NewRedisClient(t)
880+
defer ptesting.CleanupRedis(t, rdb, true, testName)
881+
882+
node := newTestNode(t, ctx, rdb, testName)
883+
defer func() { assert.NoError(t, node.Shutdown(ctx)) }()
884+
885+
// Create a worker that will become stale
886+
staleWorker := newTestWorker(t, ctx, node)
887+
888+
// Dispatch some jobs to the worker
889+
for i := 0; i < 3; i++ {
890+
jobKey := fmt.Sprintf("%s_%d", testName, i)
891+
require.NoError(t, node.DispatchJob(ctx, jobKey, []byte("test-payload")))
892+
}
893+
894+
// Wait for jobs to be assigned
895+
require.Eventually(t, func() bool {
896+
return len(staleWorker.Jobs()) == 3
897+
}, max, delay, "Jobs were not assigned to worker")
898+
899+
// Make the worker stale by stopping it and setting an old keepalive
900+
staleWorker.stop(ctx)
901+
_, err := node.workerKeepAliveMap.Set(ctx, staleWorker.ID,
902+
strconv.FormatInt(time.Now().Add(-2*node.workerTTL).UnixNano(), 10))
903+
require.NoError(t, err)
904+
905+
// Create a new worker to receive requeued jobs
906+
newWorker := newTestWorker(t, ctx, node)
907+
908+
// Wait for cleanup to happen and jobs to be requeued
909+
require.Eventually(t, func() bool {
910+
return len(newWorker.Jobs()) == 3
911+
}, max, delay, "Jobs were not requeued to new worker")
912+
913+
// Verify stale worker was deleted
914+
require.Eventually(t, func() bool {
915+
// Check that worker is removed from all tracking maps
916+
workers := node.Workers()
917+
if len(workers) != 1 {
918+
t.Logf("Expected 1 worker, got %d", len(workers))
919+
return false
920+
}
921+
922+
// Check worker is removed from worker map
923+
workerMap := node.workerMap.Map()
924+
if _, exists := workerMap[staleWorker.ID]; exists {
925+
t.Log("Worker still exists in worker map")
926+
return false
927+
}
928+
929+
// Check keepalive is removed
930+
keepAlive := node.workerKeepAliveMap.Map()
931+
if _, exists := keepAlive[staleWorker.ID]; exists {
932+
t.Log("Worker still has keepalive entry")
933+
return false
934+
}
935+
936+
// Check jobs are removed
937+
jobs := node.jobMap.Map()
938+
if _, exists := jobs[staleWorker.ID]; exists {
939+
t.Log("Worker still has jobs assigned")
940+
return false
941+
}
942+
943+
return true
944+
}, max, delay, "Stale worker was not properly cleaned up")
945+
}
946+
837947
type mockAcker struct {
838948
XAckFunc func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
839949
}

pool/scheduler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval t
9999
if err := sched.stopJobs(ctx, plan); err != nil {
100100
return fmt.Errorf("failed to stop jobs: %w", err)
101101
}
102-
pulse.Go(ctx, func() { sched.scheduleJobs(ctx, ticker, producer) })
103-
pulse.Go(ctx, func() { sched.handleStop(ctx) })
102+
103+
pulse.Go(sched.logger, func() { sched.scheduleJobs(ctx, ticker, producer) })
104+
pulse.Go(sched.logger, func() { sched.handleStop() })
104105
return nil
105106
}
106107

@@ -175,7 +176,7 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error {
175176
}
176177

177178
// handleStop handles the scheduler stop signal.
178-
func (sched *scheduler) handleStop(_ context.Context) {
179+
func (sched *scheduler) handleStop() {
179180
ch := sched.jobMap.Subscribe()
180181
for ev := range ch {
181182
if ev == rmap.EventReset {

pool/ticker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o
6767
}
6868
t.initTimer()
6969
t.wg.Add(1)
70-
pulse.Go(ctx, func() { t.handleEvents() })
70+
pulse.Go(logger, func() { t.handleEvents() })
7171
return t, nil
7272
}
7373

pool/worker.go

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"time"
1111

1212
"github.com/oklog/ulid/v2"
13+
"goa.design/clue/log"
14+
1315
"goa.design/pulse/pulse"
1416
"goa.design/pulse/rmap"
1517
"goa.design/pulse/streaming"
16-
soptions "goa.design/pulse/streaming/options"
18+
"goa.design/pulse/streaming/options"
1719
)
1820

1921
type (
@@ -94,11 +96,14 @@ func newWorker(ctx context.Context, node *Node, h JobHandler) (*Worker, error) {
9496
if _, err := node.workerKeepAliveMap.SetAndWait(ctx, wid, now); err != nil {
9597
return nil, fmt.Errorf("failed to update worker keep-alive: %w", err)
9698
}
97-
stream, err := streaming.NewStream(workerStreamName(wid), node.rdb, soptions.WithStreamLogger(node.logger))
99+
stream, err := streaming.NewStream(workerStreamName(wid), node.rdb, options.WithStreamLogger(node.logger))
98100
if err != nil {
99101
return nil, fmt.Errorf("failed to create jobs stream for worker %q: %w", wid, err)
100102
}
101-
reader, err := stream.NewReader(ctx, soptions.WithReaderBlockDuration(node.workerTTL/2), soptions.WithReaderStartAtOldest())
103+
if _, err := stream.Add(ctx, evInit, marshalEnvelope(node.ID, []byte(wid))); err != nil {
104+
return nil, fmt.Errorf("failed to add init event to worker stream %q: %w", workerStreamName(wid), err)
105+
}
106+
reader, err := stream.NewReader(ctx, options.WithReaderBlockDuration(node.workerTTL/2), options.WithReaderStartAtOldest())
102107
if err != nil {
103108
return nil, fmt.Errorf("failed to create reader for worker %q: %w", wid, err)
104109
}
@@ -110,10 +115,10 @@ func newWorker(ctx context.Context, node *Node, h JobHandler) (*Worker, error) {
110115
stream: stream,
111116
reader: reader,
112117
done: make(chan struct{}),
113-
jobsMap: node.jobsMap,
114-
jobPayloadsMap: node.jobPayloadsMap,
118+
jobsMap: node.jobMap,
119+
jobPayloadsMap: node.jobPayloadMap,
115120
keepAliveMap: node.workerKeepAliveMap,
116-
shutdownMap: node.shutdownMap,
121+
shutdownMap: node.nodeShutdownMap,
117122
workerTTL: node.workerTTL,
118123
workerShutdownTTL: node.workerShutdownTTL,
119124
logger: node.logger.WithPrefix("worker", wid),
@@ -126,8 +131,13 @@ func newWorker(ctx context.Context, node *Node, h JobHandler) (*Worker, error) {
126131
"worker_shutdown_ttl", w.workerShutdownTTL)
127132

128133
w.wg.Add(2)
129-
pulse.Go(ctx, func() { w.handleEvents(ctx, reader.Subscribe()) })
130-
pulse.Go(ctx, func() { w.keepAlive(ctx) })
134+
135+
// Create new context for the worker so that canceling the original one does
136+
// not cancel the worker.
137+
logCtx := context.Background()
138+
logCtx = log.WithContext(logCtx, ctx)
139+
pulse.Go(w.logger, func() { w.handleEvents(logCtx, reader.Subscribe()) })
140+
pulse.Go(w.logger, func() { w.keepAlive(logCtx) })
131141

132142
return w, nil
133143
}
@@ -178,6 +188,9 @@ func (w *Worker) handleEvents(ctx context.Context, c <-chan *streaming.Event) {
178188
nodeID, payload := unmarshalEnvelope(ev.Payload)
179189
var err error
180190
switch ev.EventName {
191+
case evInit:
192+
w.logger.Debug("handleEvents: received init", "event", ev.EventName, "id", ev.ID)
193+
continue
181194
case evStartJob:
182195
w.logger.Debug("handleEvents: received start job", "event", ev.EventName, "id", ev.ID)
183196
err = w.startJob(ctx, unmarshalJob(payload))
@@ -200,6 +213,7 @@ func (w *Worker) handleEvents(ctx context.Context, c <-chan *streaming.Event) {
200213
}
201214
w.ackPoolEvent(ctx, nodeID, ev.ID, nil)
202215
case <-w.done:
216+
w.logger.Debug("handleEvents: done")
203217
return
204218
}
205219
}
@@ -291,22 +305,18 @@ func (w *Worker) notify(_ context.Context, key string, payload []byte) error {
291305
// ackPoolEvent acknowledges the pool event that originated from the node with
292306
// the given ID.
293307
func (w *Worker) ackPoolEvent(ctx context.Context, nodeID, eventID string, ackerr error) {
294-
stream, ok := w.nodeStreams.Load(nodeID)
295-
if !ok {
296-
var err error
297-
stream, err = streaming.NewStream(nodeStreamName(w.node.PoolName, nodeID), w.node.rdb, soptions.WithStreamLogger(w.logger))
298-
if err != nil {
299-
w.logger.Error(fmt.Errorf("failed to create stream for node %q: %w", nodeID, err))
300-
return
301-
}
302-
w.nodeStreams.Store(nodeID, stream)
308+
stream, err := w.node.getNodeStream(nodeID)
309+
if err != nil {
310+
w.logger.Error(fmt.Errorf("failed to get ack stream for node %q: %w", nodeID, err))
311+
return
303312
}
313+
304314
var msg string
305315
if ackerr != nil {
306316
msg = ackerr.Error()
307317
}
308318
ack := &ack{EventID: eventID, Error: msg}
309-
if _, err := stream.(*streaming.Stream).Add(ctx, evAck, marshalEnvelope(w.ID, marshalAck(ack))); err != nil {
319+
if _, err := stream.Add(ctx, evAck, marshalEnvelope(w.ID, marshalAck(ack)), options.WithOnlyIfStreamExists()); err != nil {
310320
w.logger.Error(fmt.Errorf("failed to ack event %q from node %q: %w", eventID, nodeID, err))
311321
}
312322
}
@@ -328,6 +338,7 @@ func (w *Worker) keepAlive(ctx context.Context) {
328338
w.logger.Error(fmt.Errorf("failed to update worker keep-alive: %w", err))
329339
}
330340
case <-w.done:
341+
w.logger.Debug("keepAlive: done")
331342
return
332343
}
333344
}
@@ -350,15 +361,14 @@ func (w *Worker) rebalance(ctx context.Context, activeWorkers []string) {
350361
w.logger.Debug("rebalance: no jobs to rebalance")
351362
return
352363
}
353-
cherrs := make(map[string]chan error, total)
354364
for key, job := range rebalanced {
355365
if err := w.handler.Stop(key); err != nil {
356366
w.logger.Error(fmt.Errorf("rebalance: failed to stop job: %w", err), "job", key)
357367
continue
358368
}
359369
w.logger.Debug("stopped job", "job", key)
360370
w.jobs.Delete(key)
361-
cherr, err := w.node.requeueJob(ctx, w.ID, job)
371+
err := w.node.dispatchJob(ctx, key, marshalJob(job), true)
362372
if err != nil {
363373
w.logger.Error(fmt.Errorf("rebalance: failed to requeue job: %w", err), "job", key)
364374
if err := w.handler.Start(job); err != nil {
@@ -367,9 +377,7 @@ func (w *Worker) rebalance(ctx context.Context, activeWorkers []string) {
367377
continue
368378
}
369379
delete(rebalanced, key)
370-
cherrs[key] = cherr
371380
}
372-
pulse.Go(ctx, func() { w.node.processRequeuedJobs(ctx, w.ID, cherrs, false) })
373381
}
374382

375383
// requeueJobs requeues the jobs handled by the worker.
@@ -432,7 +440,7 @@ func (w *Worker) attemptRequeue(ctx context.Context, jobsToRequeue map[string]*J
432440

433441
wg.Add(len(jobsToRequeue))
434442
for key, job := range jobsToRequeue {
435-
pulse.Go(ctx, func() {
443+
pulse.Go(w.logger, func() {
436444
defer wg.Done()
437445
err := w.requeueJob(ctx, job)
438446
if err != nil {

pulse/goroutine.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package pulse
22

33
import (
4-
"context"
54
"fmt"
65
"runtime/debug"
7-
8-
"goa.design/clue/log"
96
)
107

118
// Go runs the given function in a separate goroutine and recovers from any panic,
@@ -16,14 +13,14 @@ import (
1613
// Go(ctx, func() {
1714
// // Your code here
1815
// })
19-
func Go(ctx context.Context, f func()) {
20-
go func() {
21-
defer func(ctx context.Context) {
16+
func Go(logger Logger, f func()) {
17+
go func(logger Logger) {
18+
defer func() {
2219
if r := recover(); r != nil {
2320
panicErr := fmt.Errorf("Panic recovered: %v\n%s", r, debug.Stack())
24-
log.Error(ctx, panicErr)
21+
logger.Error(panicErr)
2522
}
26-
}(ctx)
23+
}()
2724
f()
28-
}()
25+
}(logger)
2926
}

0 commit comments

Comments
 (0)