Skip to content

Commit f01bf50

Browse files
authored
Add back option to set reading block duration to node options (#46)
1 parent be67ad4 commit f01bf50

File tree

3 files changed

+24
-20
lines changed

3 files changed

+24
-20
lines changed

pool/node.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,6 @@ const (
8484
evDispatchReturn string = "d"
8585
)
8686

87-
// jobSinkBlockDuration is the max duration to block when reading from the job stream.
88-
var jobSinkBlockDuration = 5 * time.Second
89-
9087
// pendingEventTTL is the TTL for pending events.
9188
var pendingEventTTL = 2 * time.Minute
9289

@@ -160,7 +157,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No
160157
return nil, fmt.Errorf("AddNode: failed to join pool ticker replicated map %q: %w", tickerMapName(poolName), err)
161158
}
162159
poolSink, err = poolStream.NewSink(ctx, "events",
163-
soptions.WithSinkBlockDuration(jobSinkBlockDuration),
160+
soptions.WithSinkBlockDuration(o.jobSinkBlockDuration),
164161
soptions.WithSinkAckGracePeriod(o.ackGracePeriod))
165162
if err != nil {
166163
return nil, fmt.Errorf("AddNode: failed to create events sink for stream %q: %w", poolStreamName(poolName), err)
@@ -170,7 +167,7 @@ func AddNode(ctx context.Context, poolName string, rdb *redis.Client, opts ...No
170167
if err != nil {
171168
return nil, fmt.Errorf("AddNode: failed to create node event stream %q: %w", nodeStreamName(poolName, nodeID), err)
172169
}
173-
nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(jobSinkBlockDuration), soptions.WithReaderStartAtOldest())
170+
nodeReader, err = nodeStream.NewReader(ctx, soptions.WithReaderBlockDuration(o.jobSinkBlockDuration), soptions.WithReaderStartAtOldest())
174171
if err != nil {
175172
return nil, fmt.Errorf("AddNode: failed to create node event reader for stream %q: %w", nodeStreamName(poolName, nodeID), err)
176173
}

pool/node_options.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ type (
1111
NodeOption func(*nodeOptions)
1212

1313
nodeOptions struct {
14-
workerTTL time.Duration
15-
workerShutdownTTL time.Duration
16-
maxQueuedJobs int
17-
clientOnly bool
18-
ackGracePeriod time.Duration
19-
logger pulse.Logger
14+
workerTTL time.Duration
15+
workerShutdownTTL time.Duration
16+
maxQueuedJobs int
17+
clientOnly bool
18+
jobSinkBlockDuration time.Duration
19+
ackGracePeriod time.Duration
20+
logger pulse.Logger
2021
}
2122
)
2223

@@ -37,6 +38,14 @@ func WithWorkerShutdownTTL(ttl time.Duration) NodeOption {
3738
}
3839
}
3940

41+
// WithJobSinkBlockDuration sets the duration to block when reading from the
42+
// job stream. The default is 5s. This option is mostly useful for testing.
43+
func WithJobSinkBlockDuration(d time.Duration) NodeOption {
44+
return func(o *nodeOptions) {
45+
o.jobSinkBlockDuration = d
46+
}
47+
}
48+
4049
// WithMaxQueuedJobs sets the maximum number of jobs that can be queued in the pool.
4150
// The default is 1000.
4251
func WithMaxQueuedJobs(max int) NodeOption {
@@ -82,10 +91,11 @@ func parseOptions(opts ...NodeOption) *nodeOptions {
8291
// defaultPoolOptions returns the default options.
8392
func defaultPoolOptions() *nodeOptions {
8493
return &nodeOptions{
85-
workerTTL: 30 * time.Second,
86-
workerShutdownTTL: 2 * time.Minute,
87-
maxQueuedJobs: 1000,
88-
ackGracePeriod: 20 * time.Second,
89-
logger: pulse.NoopLogger(),
94+
workerTTL: 30 * time.Second,
95+
workerShutdownTTL: 2 * time.Minute,
96+
jobSinkBlockDuration: 5 * time.Second,
97+
maxQueuedJobs: 1000,
98+
ackGracePeriod: 20 * time.Second,
99+
logger: pulse.NoopLogger(),
90100
}
91101
}

pool/testing.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@ const (
3030
testAckGracePeriod = 50 * time.Millisecond
3131
)
3232

33-
func init() {
34-
jobSinkBlockDuration = 100 * time.Millisecond
35-
}
36-
3733
// newTestNode creates a new Node instance for testing purposes.
3834
// It configures the node with specific TTL and block duration settings
3935
// suitable for testing, and uses the provided Redis client and name.
@@ -42,6 +38,7 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri
4238
node, err := AddNode(ctx, name, rdb,
4339
WithLogger(pulse.ClueLogger(ctx)),
4440
WithWorkerShutdownTTL(testWorkerShutdownTTL),
41+
WithJobSinkBlockDuration(testJobSinkBlockDuration),
4542
WithWorkerTTL(testWorkerTTL),
4643
WithAckGracePeriod(testAckGracePeriod))
4744
require.NoError(t, err)

0 commit comments

Comments
 (0)