Skip to content

Commit 3143e8c

Browse files
authored
Add ability to list jobs and workers (#39)
Fix a couple of potential panics: 1. If `rmap.RemoveValues` doesn't remove a value. 2. When acking a non existing event.
1 parent 2c189b7 commit 3143e8c

File tree

4 files changed

+257
-8
lines changed

4 files changed

+257
-8
lines changed

pool/node.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ import (
77
"hash"
88
"hash/crc64"
99
"io"
10+
"slices"
1011
"sort"
1112
"strconv"
13+
"strings"
1214
"sync"
1315
"sync/atomic"
1416
"time"
@@ -261,13 +263,39 @@ func (node *Node) Workers() []*Worker {
261263
node.lock.Lock()
262264
defer node.lock.Unlock()
263265
workers := make([]*Worker, len(node.localWorkers))
264-
copy(workers, node.localWorkers)
266+
for i, w := range node.localWorkers {
267+
workers[i] = &Worker{
268+
ID: w.ID,
269+
CreatedAt: w.CreatedAt,
270+
Node: node,
271+
}
272+
}
265273
return workers
266274
}
267275

276+
// PoolWorkers returns the list of workers running in the pool.
277+
func (node *Node) PoolWorkers() []*Worker {
278+
workers := node.workerMap.Map()
279+
poolWorkers := make([]*Worker, 0, len(workers))
280+
for id, createdAt := range workers {
281+
cat, err := strconv.ParseInt(createdAt, 10, 64)
282+
if err != nil {
283+
node.logger.Error(fmt.Errorf("PoolWorkers: failed to parse createdAt %q for worker %q: %w", createdAt, id, err))
284+
continue
285+
}
286+
poolWorkers = append(poolWorkers, &Worker{ID: id, CreatedAt: time.Unix(0, cat), Node: node})
287+
}
288+
return poolWorkers
289+
}
290+
268291
// DispatchJob dispatches a job to the proper worker in the pool.
269-
// It returns the error returned by the worker's start handler if any.
270-
// If the context is done before the job is dispatched, the context error is returned.
292+
// It returns:
293+
// - nil if the job is successfully dispatched and started by a worker
294+
// - an error returned by the worker's start handler if the job fails to start
295+
// - the context error if the context is canceled before the job is started
296+
// - an error if the pool is closed or if there's a failure in adding the job
297+
//
298+
// The method blocks until one of the above conditions is met.
271299
func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) error {
272300
// Send job to pool stream.
273301
node.lock.Lock()
@@ -313,6 +341,31 @@ func (node *Node) StopJob(ctx context.Context, key string) error {
313341
return nil
314342
}
315343

344+
// JobKeys returns the list of keys of the jobs running in the pool.
345+
func (node *Node) JobKeys() []string {
346+
var jobKeys []string
347+
jobByNodes := node.jobsMap.Map()
348+
for _, jobs := range jobByNodes {
349+
jobKeys = append(jobKeys, strings.Split(jobs, ",")...)
350+
}
351+
return jobKeys
352+
}
353+
354+
// JobPayload returns the payload of the job with the given key.
355+
// It returns:
356+
// - (payload, true) if the job exists and has a payload
357+
// - (nil, true) if the job exists but has no payload (empty payload)
358+
// - (nil, false) if the job does not exist
359+
func (node *Node) JobPayload(key string) ([]byte, bool) {
360+
payload, ok := node.jobPayloadsMap.Get(key)
361+
if ok {
362+
return []byte(payload), true
363+
}
364+
keys := node.JobKeys()
365+
return nil, slices.Contains(keys, key)
366+
367+
}
368+
316369
// NotifyWorker notifies the worker that handles the job with the given key.
317370
func (node *Node) NotifyWorker(ctx context.Context, key string, payload []byte) error {
318371
node.lock.Lock()
@@ -534,9 +587,6 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
534587
pending, ok := node.pendingEvents[key]
535588
if !ok {
536589
node.logger.Error(fmt.Errorf("ackWorkerEvent: received unknown event %s from worker %s", ack.EventID, workerID))
537-
if err := node.poolSink.Ack(ctx, pending); err != nil {
538-
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to ack unknown event: %w", err), "event", pending.EventName, "id", pending.ID)
539-
}
540590
return
541591
}
542592

pool/node_test.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,170 @@ const (
2222
max = time.Second
2323
)
2424

25+
func TestWorkers(t *testing.T) {
26+
testName := strings.Replace(t.Name(), "/", "_", -1)
27+
ctx := ptesting.NewTestContext(t)
28+
rdb := ptesting.NewRedisClient(t)
29+
node := newTestNode(t, ctx, rdb, testName)
30+
defer ptesting.CleanupRedis(t, rdb, true, testName)
31+
32+
// Create a few workers
33+
worker1 := newTestWorker(t, ctx, node)
34+
worker2 := newTestWorker(t, ctx, node)
35+
worker3 := newTestWorker(t, ctx, node)
36+
37+
// Get the list of workers
38+
workers := node.Workers()
39+
40+
// Check if the number of workers is correct
41+
assert.Equal(t, 3, len(workers), "Expected 3 workers")
42+
43+
// Check if all created workers are in the list
44+
expectedWorkers := []string{worker1.ID, worker2.ID, worker3.ID}
45+
actualWorkers := make([]string, len(workers))
46+
for i, w := range workers {
47+
actualWorkers[i] = w.ID
48+
}
49+
assert.ElementsMatch(t, expectedWorkers, actualWorkers, "The list of workers should contain all created workers")
50+
51+
// Shutdown node
52+
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
53+
}
54+
55+
func TestPoolWorkers(t *testing.T) {
56+
testName := strings.Replace(t.Name(), "/", "_", -1)
57+
ctx := ptesting.NewTestContext(t)
58+
rdb := ptesting.NewRedisClient(t)
59+
node := newTestNode(t, ctx, rdb, testName)
60+
defer ptesting.CleanupRedis(t, rdb, true, testName)
61+
62+
// Create workers on the current node
63+
worker1 := newTestWorker(t, ctx, node)
64+
worker2 := newTestWorker(t, ctx, node)
65+
66+
// Create a worker on a different node
67+
otherNode := newTestNode(t, ctx, rdb, testName)
68+
worker3 := newTestWorker(t, ctx, otherNode)
69+
defer func() { assert.NoError(t, otherNode.Shutdown(ctx)) }()
70+
71+
// Check if the number of workers is correct (should include workers from all nodes)
72+
assert.Eventually(t, func() bool {
73+
return len(node.PoolWorkers()) == 3
74+
}, max, delay, "Expected 3 workers in the pool")
75+
76+
// Check if all created workers are in the list
77+
poolWorkers := node.PoolWorkers()
78+
workerIDs := make([]string, len(poolWorkers))
79+
for i, w := range poolWorkers {
80+
workerIDs[i] = w.ID
81+
}
82+
83+
expectedWorkerIDs := []string{worker1.ID, worker2.ID, worker3.ID}
84+
assert.ElementsMatch(t, expectedWorkerIDs, workerIDs, "Not all expected workers were found in the pool")
85+
86+
// Shutdown nodes
87+
assert.NoError(t, node.Shutdown(ctx), "Failed to shutdown node")
88+
}
89+
90+
func TestJobKeys(t *testing.T) {
91+
testName := strings.Replace(t.Name(), "/", "_", -1)
92+
ctx := ptesting.NewTestContext(t)
93+
rdb := ptesting.NewRedisClient(t)
94+
defer ptesting.CleanupRedis(t, rdb, true, testName)
95+
96+
node1 := newTestNode(t, ctx, rdb, testName)
97+
node2 := newTestNode(t, ctx, rdb, testName)
98+
newTestWorker(t, ctx, node1)
99+
newTestWorker(t, ctx, node2)
100+
defer func() {
101+
assert.NoError(t, node1.Shutdown(ctx))
102+
assert.NoError(t, node2.Shutdown(ctx))
103+
}()
104+
105+
// Configure nodes to send jobs to specific workers
106+
node1.h, node2.h = &ptesting.Hasher{Index: 0}, &ptesting.Hasher{Index: 1}
107+
108+
jobs := []struct {
109+
key string
110+
payload []byte
111+
}{
112+
{key: "job1", payload: []byte("payload1")},
113+
{key: "job2", payload: []byte("payload2")},
114+
{key: "job3", payload: []byte("payload3")},
115+
{key: "job4", payload: []byte("payload4")},
116+
}
117+
118+
for _, job := range jobs {
119+
assert.NoError(t, node1.DispatchJob(ctx, job.key, job.payload), fmt.Sprintf("Failed to dispatch job: %s", job.key))
120+
}
121+
122+
// Get job keys from the pool and check if all dispatched job keys are present
123+
var allJobKeys []string
124+
assert.Eventually(t, func() bool {
125+
allJobKeys = node1.JobKeys()
126+
return len(jobs) == len(allJobKeys)
127+
}, max, delay, fmt.Sprintf("Number of job keys doesn't match the number of dispatched jobs: %d != %d", len(jobs), len(allJobKeys)))
128+
for _, job := range jobs {
129+
assert.Contains(t, allJobKeys, job.key, fmt.Sprintf("Job key %s not found in JobKeys", job.key))
130+
}
131+
132+
// Dispatch a job with an existing key to node1
133+
assert.NoError(t, node1.DispatchJob(ctx, "job1", []byte("updated payload")), "Failed to dispatch job with existing key")
134+
135+
// Check that the number of job keys hasn't changed
136+
updatedAllJobKeys := node1.JobKeys()
137+
assert.Equal(t, len(allJobKeys), len(updatedAllJobKeys), "Number of job keys shouldn't change when updating an existing job")
138+
}
139+
140+
func TestJobPayload(t *testing.T) {
141+
testName := strings.Replace(t.Name(), "/", "_", -1)
142+
ctx := ptesting.NewTestContext(t)
143+
rdb := ptesting.NewRedisClient(t)
144+
defer ptesting.CleanupRedis(t, rdb, true, testName)
145+
146+
node := newTestNode(t, ctx, rdb, testName)
147+
newTestWorker(t, ctx, node)
148+
defer func() { assert.NoError(t, node.Shutdown(ctx)) }()
149+
150+
tests := []struct {
151+
name string
152+
key string
153+
payload []byte
154+
}{
155+
{"job with payload", "job1", []byte("payload1")},
156+
{"job without payload", "job2", nil},
157+
}
158+
159+
for _, tt := range tests {
160+
t.Run(tt.name, func(t *testing.T) {
161+
assert.NoError(t, node.DispatchJob(ctx, tt.key, tt.payload), "Failed to dispatch job")
162+
163+
// Check if job payload is correct
164+
assert.Eventually(t, func() bool {
165+
payload, ok := node.JobPayload(tt.key)
166+
fmt.Println(payload, ok)
167+
fmt.Println(tt.payload)
168+
return ok && assert.Equal(t, tt.payload, payload)
169+
}, max, delay, fmt.Sprintf("Failed to get correct payload for job %s", tt.key))
170+
})
171+
}
172+
173+
// Test non-existent job
174+
payload, ok := node.JobPayload("non-existent-job")
175+
assert.False(t, ok, "Expected false for non-existent job")
176+
assert.Nil(t, payload, "Expected nil payload for non-existent job")
177+
178+
// Update existing job
179+
updatedPayload := []byte("updated payload")
180+
assert.NoError(t, node.DispatchJob(ctx, "job1", updatedPayload), "Failed to update existing job")
181+
182+
// Check if the payload was updated
183+
assert.Eventually(t, func() bool {
184+
payload, ok := node.JobPayload("job1")
185+
return ok && assert.Equal(t, updatedPayload, payload, "Payload was not updated correctly")
186+
}, max, delay, "Failed to get updated payload for job")
187+
}
188+
25189
func TestDispatchJobOneWorker(t *testing.T) {
26190
testName := strings.Replace(t.Name(), "/", "_", -1)
27191
ctx := ptesting.NewTestContext(t)
@@ -306,6 +470,34 @@ func TestNodeCloseAndRequeue(t *testing.T) {
306470
require.NoError(t, node2.Shutdown(ctx), "Failed to shutdown node2")
307471
}
308472

473+
func TestAckWorkerEventWithMissingPendingEvent(t *testing.T) {
474+
// Setup
475+
ctx := ptesting.NewTestContext(t)
476+
testName := strings.Replace(t.Name(), "/", "_", -1)
477+
rdb := ptesting.NewRedisClient(t)
478+
defer ptesting.CleanupRedis(t, rdb, true, testName)
479+
node := newTestNode(t, ctx, rdb, testName)
480+
defer func() { assert.NoError(t, node.Shutdown(ctx)) }()
481+
482+
// Create a mock event with a non-existent pending event ID
483+
mockEvent := &streaming.Event{
484+
ID: "non-existent-event-id",
485+
EventName: evAck,
486+
Payload: marshalEnvelope("worker", marshalAck(&ack{EventID: "non-existent-event-id"})),
487+
Acker: &mockAcker{
488+
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
489+
return redis.NewIntCmd(ctx, 0)
490+
},
491+
},
492+
}
493+
494+
// Call ackWorkerEvent with the mock event
495+
node.ackWorkerEvent(ctx, mockEvent)
496+
497+
// Verify that no panic occurred and the function completed successfully
498+
assert.True(t, true, "ackWorkerEvent should complete without panic")
499+
}
500+
309501
func TestStaleEventsAreRemoved(t *testing.T) {
310502
// Setup
311503
ctx := ptesting.NewTestContext(t)

pool/worker.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,14 @@ func (w *Worker) Jobs() []*Job {
144144
sort.Strings(keys)
145145
jobs := make([]*Job, 0, len(w.jobs))
146146
for _, key := range keys {
147-
jobs = append(jobs, w.jobs[key])
147+
job := w.jobs[key]
148+
jobs = append(jobs, &Job{
149+
Key: key,
150+
Payload: job.Payload,
151+
CreatedAt: job.CreatedAt,
152+
Worker: &Worker{ID: w.ID, Node: w.Node, CreatedAt: w.CreatedAt},
153+
NodeID: job.NodeID,
154+
})
148155
}
149156
return jobs
150157
}

rmap/map.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ func (sm *Map) RemoveValues(ctx context.Context, key string, items ...string) ([
532532
if remaining == "" {
533533
return nil, true, nil // All items were removed, key was deleted
534534
}
535-
removed := result[1].(int64) == 1
535+
removed := result[1] != nil && result[1].(int64) == 1
536536
return strings.Split(remaining, ","), removed, nil
537537
}
538538

0 commit comments

Comments
 (0)