Skip to content

Commit 762fb9c

Browse files
authored
Delete stale consumers not associated with sinks (#43)
1 parent 871d990 commit 762fb9c

File tree

6 files changed

+124
-43
lines changed

6 files changed

+124
-43
lines changed

pool/node.go

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import (
2727
type (
2828
// Node is a pool of workers.
2929
Node struct {
30-
Name string
3130
NodeID string
31+
PoolName string
3232
poolStream *streaming.Stream // pool event stream for dispatching jobs
3333
poolSink *streaming.Sink // pool event sink
3434
nodeStream *streaming.Stream // node event stream for receiving worker events
@@ -93,14 +93,14 @@ const (
9393
// The options WithClientOnly can be used to create a node that can only be used
9494
// to dispatch jobs. Such a node does not route or process jobs in the
9595
// background.
96-
func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOption) (*Node, error) {
96+
func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...NodeOption) (*Node, error) {
9797
o := parseOptions(opts...)
9898
logger := o.logger
9999
nodeID := ulid.Make().String()
100100
if logger == nil {
101101
logger = pulse.NoopLogger()
102102
} else {
103-
logger = logger.WithPrefix("pool", name, "node", nodeID)
103+
logger = logger.WithPrefix("pool", poolName, "node", nodeID)
104104
}
105105
logger.Info("options",
106106
"client_only", o.clientOnly,
@@ -110,18 +110,18 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp
110110
"pending_job_ttl", o.pendingJobTTL,
111111
"job_sink_block_duration", o.jobSinkBlockDuration,
112112
"ack_grace_period", o.ackGracePeriod)
113-
wsm, err := rmap.Join(ctx, shutdownMapName(name), rdb, rmap.WithLogger(logger))
113+
wsm, err := rmap.Join(ctx, shutdownMapName(poolName), rdb, rmap.WithLogger(logger))
114114
if err != nil {
115-
return nil, fmt.Errorf("AddNode: failed to join shutdown replicated map %q: %w", shutdownMapName(name), err)
115+
return nil, fmt.Errorf("AddNode: failed to join shutdown replicated map %q: %w", shutdownMapName(poolName), err)
116116
}
117117
if wsm.Len() > 0 {
118-
return nil, fmt.Errorf("AddNode: pool %q is shutting down", name)
118+
return nil, fmt.Errorf("AddNode: pool %q is shutting down", poolName)
119119
}
120-
poolStream, err := streaming.NewStream(poolStreamName(name), rdb,
120+
poolStream, err := streaming.NewStream(poolStreamName(poolName), rdb,
121121
soptions.WithStreamMaxLen(o.maxQueuedJobs),
122122
soptions.WithStreamLogger(logger))
123123
if err != nil {
124-
return nil, fmt.Errorf("AddNode: failed to create pool job stream %q: %w", poolStreamName(name), err)
124+
return nil, fmt.Errorf("AddNode: failed to create pool job stream %q: %w", poolStreamName(poolName), err)
125125
}
126126
var (
127127
wm *rmap.Map
@@ -134,47 +134,47 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp
134134
nodeReader *streaming.Reader
135135
)
136136
if !o.clientOnly {
137-
wm, err = rmap.Join(ctx, workerMapName(name), rdb, rmap.WithLogger(logger))
137+
wm, err = rmap.Join(ctx, workerMapName(poolName), rdb, rmap.WithLogger(logger))
138138
if err != nil {
139-
return nil, fmt.Errorf("AddNode: failed to join pool workers replicated map %q: %w", workerMapName(name), err)
139+
return nil, fmt.Errorf("AddNode: failed to join pool workers replicated map %q: %w", workerMapName(poolName), err)
140140
}
141141
workerIDs := wm.Keys()
142142
logger.Info("joined", "workers", workerIDs)
143-
jm, err = rmap.Join(ctx, jobsMapName(name), rdb, rmap.WithLogger(logger))
143+
jm, err = rmap.Join(ctx, jobsMapName(poolName), rdb, rmap.WithLogger(logger))
144144
if err != nil {
145-
return nil, fmt.Errorf("AddNode: failed to join pool jobs replicated map %q: %w", jobsMapName(name), err)
145+
return nil, fmt.Errorf("AddNode: failed to join pool jobs replicated map %q: %w", jobsMapName(poolName), err)
146146
}
147-
jpm, err = rmap.Join(ctx, jobPayloadsMapName(name), rdb, rmap.WithLogger(logger))
147+
jpm, err = rmap.Join(ctx, jobPayloadsMapName(poolName), rdb, rmap.WithLogger(logger))
148148
if err != nil {
149-
return nil, fmt.Errorf("AddNode: failed to join pool job payloads replicated map %q: %w", jobPayloadsMapName(name), err)
149+
return nil, fmt.Errorf("AddNode: failed to join pool job payloads replicated map %q: %w", jobPayloadsMapName(poolName), err)
150150
}
151-
km, err = rmap.Join(ctx, keepAliveMapName(name), rdb, rmap.WithLogger(logger))
151+
km, err = rmap.Join(ctx, keepAliveMapName(poolName), rdb, rmap.WithLogger(logger))
152152
if err != nil {
153-
return nil, fmt.Errorf("AddNode: failed to join pool keep-alive replicated map %q: %w", keepAliveMapName(name), err)
153+
return nil, fmt.Errorf("AddNode: failed to join pool keep-alive replicated map %q: %w", keepAliveMapName(poolName), err)
154154
}
155-
tm, err = rmap.Join(ctx, tickerMapName(name), rdb, rmap.WithLogger(logger))
155+
tm, err = rmap.Join(ctx, tickerMapName(poolName), rdb, rmap.WithLogger(logger))
156156
if err != nil {
157-
return nil, fmt.Errorf("AddNode: failed to join pool ticker replicated map %q: %w", tickerMapName(name), err)
157+
return nil, fmt.Errorf("AddNode: failed to join pool ticker replicated map %q: %w", tickerMapName(poolName), err)
158158
}
159159
poolSink, err = poolStream.NewSink(ctx, "events",
160160
soptions.WithSinkBlockDuration(o.jobSinkBlockDuration),
161161
soptions.WithSinkAckGracePeriod(o.ackGracePeriod))
162162
if err != nil {
163-
return nil, fmt.Errorf("AddNode: failed to create events sink for stream %q: %w", poolStreamName(name), err)
163+
return nil, fmt.Errorf("AddNode: failed to create events sink for stream %q: %w", poolStreamName(poolName), err)
164164
}
165165
}
166-
nodeStream, err = streaming.NewStream(nodeStreamName(name, nodeID), rdb, soptions.WithStreamLogger(logger))
166+
nodeStream, err = streaming.NewStream(nodeStreamName(poolName, nodeID), rdb, soptions.WithStreamLogger(logger))
167167
if err != nil {
168-
return nil, fmt.Errorf("AddNode: failed to create node event stream %q: %w", nodeStreamName(name, nodeID), err)
168+
return nil, fmt.Errorf("AddNode: failed to create node event stream %q: %w", nodeStreamName(poolName, nodeID), err)
169169
}
170170
nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(o.jobSinkBlockDuration), soptions.WithReaderStartAtOldest())
171171
if err != nil {
172-
return nil, fmt.Errorf("AddNode: failed to create node event reader for stream %q: %w", nodeStreamName(name, nodeID), err)
172+
return nil, fmt.Errorf("AddNode: failed to create node event reader for stream %q: %w", nodeStreamName(poolName, nodeID), err)
173173
}
174174

175175
p := &Node{
176-
Name: name,
177176
NodeID: nodeID,
177+
PoolName: poolName,
178178
keepAliveMap: km,
179179
workerMap: wm,
180180
jobsMap: jm,
@@ -225,10 +225,10 @@ func (node *Node) AddWorker(ctx context.Context, handler JobHandler) (*Worker, e
225225
node.lock.Lock()
226226
defer node.lock.Unlock()
227227
if node.closing {
228-
return nil, fmt.Errorf("AddWorker: pool %q is closed", node.Name)
228+
return nil, fmt.Errorf("AddWorker: pool %q is closed", node.PoolName)
229229
}
230230
if node.clientOnly {
231-
return nil, fmt.Errorf("AddWorker: pool %q is client-only", node.Name)
231+
return nil, fmt.Errorf("AddWorker: pool %q is client-only", node.PoolName)
232232
}
233233
w, err := newWorker(ctx, node, handler)
234234
if err != nil {
@@ -303,7 +303,7 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e
303303
node.lock.Lock()
304304
if node.closing {
305305
node.lock.Unlock()
306-
return fmt.Errorf("DispatchJob: pool %q is closed", node.Name)
306+
return fmt.Errorf("DispatchJob: pool %q is closed", node.PoolName)
307307
}
308308
job := marshalJob(&Job{Key: key, Payload: payload, CreatedAt: time.Now(), NodeID: node.NodeID})
309309
eventID, err := node.poolStream.Add(ctx, evStartJob, job)
@@ -344,7 +344,7 @@ func (node *Node) StopJob(ctx context.Context, key string) error {
344344
node.lock.Lock()
345345
defer node.lock.Unlock()
346346
if node.closing {
347-
return fmt.Errorf("StopJob: pool %q is closed", node.Name)
347+
return fmt.Errorf("StopJob: pool %q is closed", node.PoolName)
348348
}
349349
if _, err := node.poolStream.Add(ctx, evStopJob, marshalJobKey(key)); err != nil {
350350
return fmt.Errorf("StopJob: failed to add stop job to stream %q: %w", node.poolStream.Name, err)
@@ -383,7 +383,7 @@ func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte)
383383
node.lock.Lock()
384384
defer node.lock.Unlock()
385385
if node.closing {
386-
return fmt.Errorf("NotifyWorker: pool %q is closed", node.Name)
386+
return fmt.Errorf("NotifyWorker: pool %q is closed", node.PoolName)
387387
}
388388
if _, err := node.poolStream.Add(ctx, evNotify, marshalNotification(key, payload)); err != nil {
389389
return fmt.Errorf("NotifyWorker: failed to add notification to stream %q: %w", node.poolStream.Name, err)
@@ -421,7 +421,7 @@ func (node *Node) Shutdown(ctx context.Context) error {
421421
}
422422

423423
// Now clean up the shutdown replicated map.
424-
wsm, err := rmap.Join(ctx, shutdownMapName(node.Name), node.rdb, rmap.WithLogger(node.logger))
424+
wsm, err := rmap.Join(ctx, shutdownMapName(node.PoolName), node.rdb, rmap.WithLogger(node.logger))
425425
if err != nil {
426426
node.logger.Error(fmt.Errorf("Shutdown: failed to join shutdown map for cleanup: %w", err))
427427
}
@@ -535,7 +535,7 @@ func (node *Node) routeWorkerEvent(ctx context.Context, ev *streaming.Event) err
535535
key := unmarshalJobKey(ev.Payload)
536536
activeWorkers := node.activeWorkers()
537537
if len(activeWorkers) == 0 {
538-
return fmt.Errorf("routeWorkerEvent: no active worker in pool %q", node.Name)
538+
return fmt.Errorf("routeWorkerEvent: no active worker in pool %q", node.PoolName)
539539
}
540540
wid := activeWorkers[node.h.Hash(key, int64(len(activeWorkers)))]
541541

@@ -606,14 +606,14 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
606606
// dispatched the job.
607607
if pending.EventName == evStartJob {
608608
_, nodeID := unmarshalJobKeyAndNodeID(pending.Payload)
609-
stream, err := streaming.NewStream(nodeStreamName(node.Name, nodeID), node.rdb, soptions.WithStreamLogger(node.logger))
609+
stream, err := streaming.NewStream(nodeStreamName(node.PoolName, nodeID), node.rdb, soptions.WithStreamLogger(node.logger))
610610
if err != nil {
611-
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to create node event stream %q: %w", nodeStreamName(node.Name, nodeID), err))
611+
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to create node event stream %q: %w", nodeStreamName(node.PoolName, nodeID), err))
612612
return
613613
}
614614
ack.EventID = pending.ID
615615
if _, err := stream.Add(ctx, evDispatchReturn, marshalAck(ack), soptions.WithOnlyIfStreamExists()); err != nil {
616-
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to dispatch return to stream %q: %w", nodeStreamName(node.Name, nodeID), err))
616+
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to dispatch return to stream %q: %w", nodeStreamName(node.PoolName, nodeID), err))
617617
}
618618
}
619619

pool/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ var ErrScheduleStop = fmt.Errorf("stop")
7171
// returns ErrScheduleStop. Plan is called on only one of the nodes that
7272
// scheduled the same producer.
7373
func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval time.Duration) error {
74-
name := node.Name + ":" + producer.Name()
74+
name := node.PoolName + ":" + producer.Name()
7575
jobMap, err := rmap.Join(ctx, name, node.rdb, rmap.WithLogger(node.logger))
7676
if err != nil {
7777
return fmt.Errorf("failed to join job map %s: %w", name, err)

pool/ticker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o
3636
if node.clientOnly {
3737
return nil, fmt.Errorf("cannot create ticker on client-only node")
3838
}
39-
name = node.Name + ":" + name
39+
name = node.PoolName + ":" + name
4040
o := parseTickerOptions(opts...)
4141
logger := o.logger
4242
if logger == nil {

pool/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func newWorker(ctx context.Context, node *Node, h JobHandler) (*Worker, error) {
8989
wid := ulid.Make().String()
9090
createdAt := time.Now()
9191
if _, err := node.workerMap.SetAndWait(ctx, wid, strconv.FormatInt(createdAt.UnixNano(), 10)); err != nil {
92-
return nil, fmt.Errorf("failed to add worker %q to pool %q: %w", wid, node.Name, err)
92+
return nil, fmt.Errorf("failed to add worker %q to pool %q: %w", wid, node.PoolName, err)
9393
}
9494
now := strconv.FormatInt(time.Now().UnixNano(), 10)
9595
if _, err := node.keepAliveMap.SetAndWait(ctx, wid, now); err != nil {
@@ -305,7 +305,7 @@ func (w *Worker) ackPoolEvent(ctx context.Context, nodeID, eventID string, acker
305305
stream, ok := w.nodeStreams[nodeID]
306306
if !ok {
307307
var err error
308-
stream, err = streaming.NewStream(nodeStreamName(w.Node.Name, nodeID), w.Node.rdb, soptions.WithStreamLogger(w.logger))
308+
stream, err = streaming.NewStream(nodeStreamName(w.Node.PoolName, nodeID), w.Node.rdb, soptions.WithStreamLogger(w.logger))
309309
if err != nil {
310310
w.logger.Error(fmt.Errorf("failed to create stream for node %q: %w", nodeID, err))
311311
return

pulse/goroutine_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package pulse
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"goa.design/clue/log"
13+
)
14+
15+
func TestGo(t *testing.T) {
16+
t.Run("executes function without panic", func(t *testing.T) {
17+
ctx := context.Background()
18+
var wg sync.WaitGroup
19+
wg.Add(1)
20+
executed := false
21+
22+
Go(ctx, func() {
23+
defer wg.Done()
24+
executed = true
25+
})
26+
27+
wg.Wait()
28+
assert.True(t, executed, "Function should have been executed")
29+
})
30+
31+
t.Run("recovers from panic and logs error", func(t *testing.T) {
32+
ctx := context.Background()
33+
var wg sync.WaitGroup
34+
wg.Add(1)
35+
36+
var buf strings.Builder
37+
ctx = log.Context(ctx, log.WithOutput(&buf))
38+
39+
Go(ctx, func() {
40+
defer wg.Done()
41+
panic("test panic")
42+
})
43+
44+
wg.Wait()
45+
46+
// Use eventually to allow for asynchronous logging
47+
assert.Eventually(t, func() bool {
48+
logOutput := buf.String()
49+
return strings.Contains(logOutput, "Panic recovered: test panic") &&
50+
strings.Contains(logOutput, "goroutine.go")
51+
}, 100*time.Millisecond, 10*time.Millisecond, "Log should contain panic message and stack trace")
52+
})
53+
54+
t.Run("handles non-string panic values", func(t *testing.T) {
55+
ctx := context.Background()
56+
var wg sync.WaitGroup
57+
wg.Add(1)
58+
59+
var buf strings.Builder
60+
ctx = log.Context(ctx, log.WithOutput(&buf))
61+
62+
Go(ctx, func() {
63+
defer wg.Done()
64+
panic(errors.New("custom error"))
65+
})
66+
67+
wg.Wait()
68+
assert.Eventually(t, func() bool {
69+
logOutput := buf.String()
70+
return strings.Contains(logOutput, "Panic recovered: custom error") &&
71+
strings.Contains(logOutput, "goroutine.go")
72+
}, 100*time.Millisecond, 10*time.Millisecond, "Log should contain panic message and stack trace")
73+
})
74+
}

streaming/sink.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"math/rand"
77
"regexp"
8-
"slices"
98
"strconv"
109
"strings"
1110
"sync"
@@ -493,40 +492,48 @@ func (s *Sink) claimIdleMessages(ctx context.Context) {
493492
// s.lock must be held.
494493
func (s *Sink) deleteStaleConsumers(ctx context.Context) {
495494
keepAlives := s.consumersKeepAliveMap.Map()
496-
var staleConsumers []string
495+
staleConsumers := make(map[string]struct{})
497496
for consumer, timestamp := range keepAlives {
498497
ts, err := strconv.ParseInt(timestamp, 10, 64)
499498
if err != nil {
500499
s.logger.Error(fmt.Errorf("failed to parse timestamp for consumers: %w", err), "consumer", consumer)
501500
continue
502501
}
503502
if time.Since(time.Unix(0, ts)) > 2*s.ackGracePeriod {
504-
staleConsumers = append(staleConsumers, consumer)
505-
s.logger.Debug("stale consumer", "consumer", consumer, "since", time.Since(time.Unix(0, ts)), "grace", 2*s.ackGracePeriod)
503+
staleConsumers[consumer] = struct{}{}
504+
s.logger.Debug("stale consumer", "consumer", consumer, "since", time.Since(time.Unix(0, ts)), "ttl", 2*s.ackGracePeriod)
506505
}
507506
}
507+
// Delete any stale consumers from the consumer group.
508508
for _, stream := range s.streams {
509509
sinks := s.consumersMap[stream.Name]
510510
for _, sink := range sinks.Keys() {
511511
consumers, _ := sinks.GetValues(sink)
512512
for _, consumer := range consumers {
513-
if !slices.Contains(staleConsumers, consumer) {
513+
if _, ok := staleConsumers[consumer]; !ok {
514514
continue
515515
}
516516
if err := s.rdb.XGroupDelConsumer(ctx, stream.key, s.Name, consumer).Err(); err != nil {
517517
s.logger.Error(fmt.Errorf("failed to delete stale consumer %s: %w", consumer, err))
518-
continue
519518
}
520519
if _, err := s.consumersKeepAliveMap.Delete(ctx, consumer); err != nil {
521520
s.logger.Error(fmt.Errorf("failed to delete sink keep-alive for stale consumer %s: %w", consumer, err))
522521
}
523522
if _, _, err := sinks.RemoveValues(ctx, s.Name, consumer); err != nil {
524523
s.logger.Error(fmt.Errorf("failed to remove consumer from map: %w", err), "stream", stream.Name, "consumer", consumer)
525524
}
525+
delete(staleConsumers, consumer)
526526
s.logger.Debug("deleted stale consumer", "consumer", consumer)
527527
}
528528
}
529529
}
530+
// Delete any remaining stale consumers from the keep-alive map.
531+
for consumer := range staleConsumers {
532+
if _, err := s.consumersKeepAliveMap.Delete(ctx, consumer); err != nil {
533+
s.logger.Error(fmt.Errorf("failed to delete sink keep-alive for orphaned consumer %s: %w", consumer, err))
534+
}
535+
s.logger.Debug("deleted orphaned consumer", "consumer", consumer)
536+
}
530537
}
531538

532539
// Helper function to claim messages from a stream used by claimIdleMessages.

0 commit comments

Comments
 (0)