Skip to content

Commit be67ad4

Browse files
authored
Fix/lost job on requeue (#45)
* Do not remove requeued jobs from Redis * Simplify usage of pool package Remove options that should not be changed. Fix a potential race condition on requeue where a job payload may get deleted too early. * Fix race condition in test
1 parent 869949d commit be67ad4

File tree

9 files changed

+163
-282
lines changed

9 files changed

+163
-282
lines changed

examples/pool/scheduler/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,7 @@ func main() {
2828
}
2929

3030
// Create node for pool "example".
31-
node, err := pool.AddNode(ctx, "example", rdb,
32-
pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster
33-
pool.WithLogger(logger),
34-
)
31+
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
3532
if err != nil {
3633
panic(err)
3734
}

examples/pool/worker/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ func main() {
5151
}
5252

5353
// Create node for pool "example".
54-
node, err := pool.AddNode(ctx, "example", rdb,
55-
pool.WithJobSinkBlockDuration(100*time.Millisecond), // Shutdown faster
56-
pool.WithLogger(logger),
57-
)
54+
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
5855
if err != nil {
5956
panic(err)
6057
}

pool/README.md

Lines changed: 61 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,48 @@ flowchart LR
4848

4949
## Usage
5050

51-
Pulse dedicated worker pools are generally valuable when workers require
52-
state which depends on the jobs they perform.
53-
54-
To illustrate, let's consider the scenario of a multitenant system that requires
55-
managing a collection of background tasks for each tenant. In this case,
56-
utilizing a Pulse worker pool proves to be highly beneficial. The system can
57-
create a dedicated worker pool and create one job per tenant, utilizing the
58-
unique tenant identifier as the job key. This approach ensures that only one
59-
worker handles the background task for a specific tenant at any given time. As
60-
new tenants are added or old ones are removed, jobs can be started or stopped
61-
accordingly. Similarly, workers can be added or removed based on performance
62-
requirements.
63-
64-
Pulse dedicated worker pools are not needed when workers are stateless and can
65-
be scaled horizontally. In such cases, any standard load balancing solution can
66-
be used.
51+
Job producer:
52+
```go
53+
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
54+
node, err := pool.AddNode(ctx, "example", rdb, pool.WithClientOnly())
55+
if err != nil {
56+
panic(err)
57+
}
58+
if err := node.DispatchJob(ctx, "key", []byte("payload")); err != nil {
59+
panic(err)
60+
}
61+
```
62+
63+
Worker:
64+
```go
65+
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
66+
node, err := pool.AddNode(ctx, "example", rdb)
67+
if err != nil {
68+
panic(err)
69+
}
70+
handler := &JobHandler{}
71+
_, err := node.AddWorker(context.Background(), handler)
72+
if err != nil {
73+
panic(err)
74+
}
75+
```
76+
77+
Job handler:
78+
```go
79+
type JobHandler struct {
80+
// ...
81+
}
82+
83+
// Pulse calls this method to start a job that was assigned to this worker.
84+
func (h *JobHandler) Start(ctx context.Context, key string, payload []byte) error {
85+
// ...
86+
}
87+
88+
// Pulse calls this method to stop a job that was assigned to this worker.
89+
func (h *JobHandler) Stop(ctx context.Context, key string) error {
90+
// ...
91+
}
92+
```
6793

6894
### Creating A Pool
6995

@@ -78,24 +104,28 @@ should be closed when it is no longer needed (see below).
78104
The options are used to configure the pool node. The following options are
79105
available:
80106

107+
* `WithClientOnly` - specifies that this node will only be used to dispatch jobs to
108+
workers in other nodes, and will not run any workers itself.
81109
* `WithLogger` - sets the logger to be used by the pool node.
82-
* `WithWorkerTTL` - sets the worker time-to-live (TTL) in seconds. The TTL
83-
defines the maximum delay between two health-checks before a worker is removed
84-
from the pool. The default value is 10 seconds.
85-
* `WithPendingJobTTL` - sets the pending job time-to-live (TTL) in seconds. The
86-
TTL defines the maximum delay between a worker picking up the job and
87-
successfully starting it. The default value is 20 seconds.
110+
* `WithWorkerTTL` - sets the worker time-to-live (TTL). This is the maximum duration
111+
a worker can go without sending a health check before it's considered inactive
112+
and removed from the pool. If a worker doesn't report its status within this
113+
time frame, it will be removed, allowing the pool to reassign its jobs to other
114+
active workers. The default value is 30 seconds.
88115
* `WithWorkerShutdownTTL` - specifies the maximum time to wait for a worker to
89-
shutdown gracefully. The default value is 2 minutes.
116+
shutdown gracefully. This is the duration the pool will wait for a worker to
117+
finish its current job and perform any cleanup operations before forcefully
118+
terminating it. If the worker doesn't shut down within this time, it will be
119+
forcefully stopped. The default value is 2 minutes.
90120
* `WithMaxQueuedJobs` - sets the maximum number of jobs that can be queued
91-
before the pool starts rejecting new jobs. The default value is 1000.
92-
* `WithClientOnly` - specifies that the pool node should not starts
93-
background goroutines to manage the pool and thus not allow creating workers.
94-
This option is useful when the pool is used only to dispatch jobs to workers
95-
that are created in other nodes.
96-
* `WithJobSinkBlockDuration` - sets the max poll duration for new jobs. This
97-
value is mostly used by tests to accelerate the pool shutdown process. The
98-
default value is 5 seconds.
121+
before the pool starts rejecting new jobs. This limit applies to the entire
122+
pool across all nodes. When this limit is reached, any attempt to dispatch
123+
new jobs will result in an error. The default value is 1000 jobs.
124+
* `WithAckGracePeriod` - sets the grace period for job acknowledgment. If a
125+
worker doesn't acknowledge starting a job within this duration, the job
126+
becomes available for other workers to claim. This prevents jobs from being
127+
stuck if a worker fails to start processing them. The default value is 20
128+
seconds.
99129

100130
### Closing A Node
101131

@@ -168,165 +198,3 @@ a list of jobs to be started and stopped.
168198

169199
`Schedule` makes it possible to maintain a pool of jobs for example in a
170200
multi-tenant system. See the [examples](../examples/pool) for more details.
171-
172-
## Data Flows
173-
174-
The following sections provide additional details on the internal data flows
175-
involved in creating and using a Pulse worker pool. They are provided for
176-
informational purposes only and are not required reading for simply using the
177-
package.
178-
179-
### Adding A New Job
180-
181-
The following diagram illustrates the data flow involved in adding a new job to
182-
a Pulse worker pool:
183-
184-
* The producer calls `DispatchJob` which adds an event to the pool job stream.
185-
* The pool job stream is read by the pool sink running in one of the pool nodes.
186-
The routing node records the event so it can ack it later and routes the event
187-
to the proper worker stream using a consistent hashing algorithm.
188-
* The dedicated worker stream is read by the worker which starts the job by
189-
calling the `Start` method on the worker job handler. Once `Start` returns
190-
successfully the worker sends an event back to the original pool node.
191-
* Upon getting the event, the pool node acks the job with the
192-
pool job stream and removes it from its pending jobs map.
193-
194-
195-
```mermaid
196-
%%{ init: { 'flowchart': { 'curve': 'basis' } } }%%
197-
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
198-
flowchart TD
199-
subgraph w[Worker Node]
200-
r[Reader]
201-
u[User code]
202-
end
203-
subgraph rdb[Redis]
204-
js(["Pool Job Stream (shared)"])
205-
ws(["Worker Stream (dedicated)"])
206-
rs(["Routing Node Stream (dedicated)"])
207-
end
208-
subgraph p[Producer Node]
209-
pr[User code]
210-
no[Client Node]
211-
end
212-
subgraph ro[Routing Node]
213-
ps[Pool Sink]
214-
nr[Routing Node Reader]
215-
end
216-
pr --1. DispatchJob--> no
217-
no --2. Add Job--> js
218-
js --3. Job--> ps
219-
ps --4. Add Job--> ws
220-
ws --5. Job--> r
221-
r --6. Start Job--> u
222-
r --7. Add Ack--> rs
223-
rs --7. Ack--> nr
224-
nr --8. Ack Add Job Event--> js
225-
226-
classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
227-
classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF;
228-
classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
229-
classDef background fill:#7A7A7A, color:#F2F2F2;
230-
231-
class pr,u userCode;
232-
class pj,js,ws,rs redis;
233-
class no,ps,r,c,nr producer;
234-
class p,w,rdb,ro background;
235-
236-
linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
237-
linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
238-
linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
239-
linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
240-
linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
241-
linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
242-
linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
243-
linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
244-
```
245-
246-
The worker pool uses a job stream so that jobs that do not get acknowledged in time
247-
are automatically re-queued. This is useful in case of worker failure or
248-
network partitioning. The pool sink applies the consistent hashing algorithm
249-
to the job key to determine which worker stream the job should be added to. This
250-
ensures that unhealthy workers are properly ignored when requeuing jobs.
251-
252-
### Shutdown and Cleanup
253-
254-
The following diagram illustrates the data flow involved in shutting down a
255-
Pulse worker pool:
256-
257-
* The producer calls `Shutdown` which adds a shutdown event to the pool stream.
258-
* Upon receving the shutdown event the pool node closes the pool stream to avoid
259-
accepting new jobs and sets a flag in the pool shutdown replicated map.
260-
* The pool nodes get notified and stop accepting new jobs (`DispatchJob`
261-
returns an error if called).
262-
* The pool nodes add a stop event to the worker streams for all the workers
263-
they own.
264-
* Upon receiving the event, the workers remove themselves from the pool
265-
workers replicated map, destroy their stream and exit. Note that any job that
266-
was enqueued before the shutdown event still gets processed.
267-
* Once the workers have stopped, the producer that initiated the
268-
shutdown cleans up the pool resources (jobs sink, jobs stream, replicated maps)
269-
and the pool nodes exit.
270-
271-
```mermaid
272-
%%{ init: { 'flowchart': { 'curve': 'basis' } } }%%
273-
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
274-
275-
flowchart TD
276-
subgraph pn1[Pool Node 1]
277-
u[User code]
278-
po1[Pool 1]
279-
w1[Worker 1]
280-
end
281-
subgraph pn2[Pool Node 2]
282-
po2[Pool 2]
283-
w2[Worker 2]
284-
end
285-
subgraph rdb[Redis]
286-
sr[(Shutdown <br/> Replicated Map)]
287-
wr[(Worker </br/> Replicated Map)]
288-
ws1(["Worker 1 Stream"])
289-
ws2(["Worker 2 Stream"])
290-
end
291-
u[User code] --1. Shutdown--> po1[Pool 1]
292-
po1 --2. Set Shutdown Flag--> sr[(Shutdown <br/> Replicated Map)]
293-
sr --3. Shutdown Flag--> po1
294-
sr --3. Shutdown Flag--> po2
295-
po1 --4. Add Stop--> ws1
296-
po2 --4. Add Stop--> ws2
297-
ws1 --5. Stop--> w1
298-
ws2 --5. Stop--> w2
299-
w1 --6. Remove Worker--> wr
300-
w2 --6. Remove Worker--> wr
301-
w1 --7. Delete--> ws1
302-
w2 --7. Delete--> ws2
303-
wr --8. Workers Empty--> po1
304-
po1 --9. Delete --> sr
305-
po1 --10. Delete --> wr
306-
307-
classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
308-
classDef producer fill:#2C5A9A, stroke:#6B96C1, stroke-width:2px, color:#CCE0FF;
309-
classDef redis fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;
310-
classDef background fill:#7A7A7A, color:#F2F2F2;
311-
312-
class u userCode;
313-
class wr,sr,ws1,ws2 redis;
314-
class po1,po2,w1,w2 producer;
315-
class rdb,pn1,pn2 background;
316-
317-
linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
318-
linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
319-
linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
320-
linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
321-
linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
322-
linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
323-
linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
324-
linkStyle 7 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
325-
linkStyle 8 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
326-
linkStyle 9 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
327-
linkStyle 10 stroke:#FF8888,color:#FF8888,stroke-width:3px;
328-
linkStyle 11 stroke:#FF8888,color:#FF8888,stroke-width:3px;
329-
linkStyle 12 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
330-
linkStyle 13 stroke:#FF8888,color:#FF8888,stroke-width:3px;
331-
linkStyle 14 stroke:#FF8888,color:#FF8888,stroke-width:3px;
332-
```

0 commit comments

Comments
 (0)