Skip to content

Commit 871d990

Browse files
authored
Timeout DispatchJob if no dispatch return is received (#42)
This is to avoid blocking in case of error.
1 parent fb0e73f commit 871d990

File tree

3 files changed

+20
-7
lines changed

3 files changed

+20
-7
lines changed

pool/node.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type (
4242
workerTTL time.Duration // Worker considered dead if keep-alive not updated after this duration
4343
workerShutdownTTL time.Duration // Worker considered dead if not shutdown after this duration
4444
pendingJobTTL time.Duration // Job lease expires if not acked after this duration
45+
ackGracePeriod time.Duration // Wait for return status up to this duration
4546
logger pulse.Logger
4647
h hasher
4748
stop chan struct{} // closed when node is stopped
@@ -191,6 +192,7 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp
191192
workerTTL: o.workerTTL,
192193
workerShutdownTTL: o.workerShutdownTTL,
193194
pendingJobTTL: o.pendingJobTTL,
195+
ackGracePeriod: o.ackGracePeriod,
194196
h: jumpHash{crc64.New(crc64.MakeTable(crc64.ECMA))},
195197
stop: make(chan struct{}),
196198
rdb: rdb,
@@ -313,16 +315,26 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e
313315
node.pendingJobs[eventID] = cherr
314316
node.lock.Unlock()
315317

316-
// Wait for return status.
318+
// Wait for return status up to ack grace period.
319+
timer := time.NewTimer(2 * node.ackGracePeriod)
320+
defer timer.Stop()
321+
317322
select {
318323
case err = <-cherr:
324+
case <-timer.C:
325+
err = fmt.Errorf("DispatchJob: job %q timed out, TTL: %v", key, 2*node.ackGracePeriod)
319326
case <-ctx.Done():
320327
err = ctx.Err()
321328
}
322329

330+
node.lock.Lock()
331+
delete(node.pendingJobs, eventID)
323332
close(cherr)
333+
node.lock.Unlock()
324334
if err == nil {
325335
node.logger.Info("dispatched", "key", key)
336+
} else {
337+
node.logger.Error(fmt.Errorf("DispatchJob: failed to dispatch job: %w", err), "key", key)
326338
}
327339
return err
328340
}
@@ -636,9 +648,9 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) {
636648
return
637649
}
638650
node.logger.Debug("dispatch return", "event", ev.EventName, "id", ev.ID, "ack-id", ack.EventID)
639-
delete(node.pendingJobs, ack.EventID)
640651
if cherr == nil {
641652
// Event was requeued.
653+
delete(node.pendingJobs, ack.EventID)
642654
return
643655
}
644656
var err error

pool/scheduler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,11 @@ func (sched *scheduler) startJobs(ctx context.Context, jobs []*JobParam) error {
134134
for _, job := range jobs {
135135
err := sched.node.DispatchJob(ctx, job.Key, job.Payload)
136136
if err != nil {
137-
sched.logger.Error(err, "failed to dispatch job", "job", job.Key)
137+
sched.logger.Error(fmt.Errorf("failed to dispatch job: %w", err), "job", job.Key)
138138
continue
139139
}
140140
if _, err := sched.jobMap.Set(ctx, job.Key, time.Now().String()); err != nil {
141-
sched.logger.Error(err, "failed to store job", "job", job.Key)
141+
sched.logger.Error(fmt.Errorf("failed to store job: %w", err), "job", job.Key)
142142
continue
143143
}
144144
}
@@ -164,11 +164,11 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error {
164164
for _, key := range toStop {
165165
err := sched.node.StopJob(ctx, key)
166166
if err != nil {
167-
sched.logger.Error(err, "failed to stop job", "job", key)
167+
sched.logger.Error(fmt.Errorf("failed to stop job: %w", err), "job", key)
168168
continue
169169
}
170170
if _, err := sched.jobMap.Delete(ctx, key); err != nil {
171-
sched.logger.Error(err, "failed to delete job", "job", key)
171+
sched.logger.Error(fmt.Errorf("failed to delete job: %w", err), "job", key)
172172
}
173173
}
174174
return nil

streaming/sink.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,12 +410,13 @@ func (s *Sink) periodicKeepAlive() {
410410
for {
411411
select {
412412
case <-ticker.C:
413+
s.lock.Lock()
413414
now := time.Now().UnixNano()
414415
if _, err := s.consumersKeepAliveMap.Set(ctx, s.consumer, strconv.FormatInt(now, 10)); err != nil {
415416
s.logger.Error(fmt.Errorf("failed to update sink keep-alive: %v", err))
417+
s.lock.Unlock()
416418
continue
417419
}
418-
s.lock.Lock()
419420
s.lastKeepAlive = now
420421
s.lock.Unlock()
421422

0 commit comments

Comments
 (0)