diff --git a/streaming/reader.go b/streaming/reader.go index d60c6a4..3cb5fae 100644 --- a/streaming/reader.go +++ b/streaming/reader.go @@ -42,6 +42,8 @@ type ( bufferSize int // channels to send notifications chans []chan *Event + // startOnce is used to ensure the reader is started only once. + startOnce sync.Once // donechan is the reader donechan channel. donechan chan struct{} // streamschan notifies the reader when streams are added or @@ -114,9 +116,6 @@ func newReader(stream *Stream, opts ...options.Reader) (*Reader, error) { rdb: stream.rdb, } - reader.wait.Add(1) - pulse.Go(reader.logger, reader.read) - return reader, nil } @@ -127,6 +126,7 @@ func (r *Reader) Subscribe() <-chan *Event { r.lock.Lock() defer r.lock.Unlock() r.chans = append(r.chans, c) + r.start() return c } @@ -209,6 +209,14 @@ func (r *Reader) IsClosed() bool { return r.closed } +// start starts the reader's read goroutine if it is not already running. +func (r *Reader) start() { + r.startOnce.Do(func() { + r.wait.Add(1) + pulse.Go(r.logger, r.read) + }) +} + // read reads events from the streams and sends them to the reader channel. func (r *Reader) read() { ctx := context.Background()