Skip to content

Commit 67b2457

Browse files
authored
Fix a few issues (#48)
* Fix a few issues * Make sure that adding a dispatch return event to a node stream creates the stream if needed. This could cause jobs that are added right on startup to fail to be dispatched. * Fix potential panic in cleanup goroutine when the pending jobs map contains nil values * Make sure to close all maps when closing the node. * Properly log event information when discarding stale ack events. * More consistent logging * Fix a few additional issues * Ensure stale node streams are gced * Make close and shutdown code cleaner * Cleanup all maps properly on node close
1 parent 9881791 commit 67b2457

File tree

16 files changed

+626
-355
lines changed

16 files changed

+626
-355
lines changed

examples/streaming/multi-sinks/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func main() {
5454
}
5555

5656
// Don't forget to close the sink when done
57-
defer sink1.Close()
57+
defer sink1.Close(ctx)
5858

5959
// Read and acknowlege event
6060
ev := <-sink1.Subscribe()
@@ -70,7 +70,7 @@ func main() {
7070
if err != nil {
7171
panic(err)
7272
}
73-
defer sink2.Close()
73+
defer sink2.Close(ctx)
7474

7575
// Read second event
7676
ev = <-sink2.Subscribe()

examples/streaming/multi-streams/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func main() {
4040
}
4141

4242
// Don't forget to close the sink when done
43-
defer sink.Close()
43+
defer sink.Close(ctx)
4444

4545
// Subscribe to events
4646
c := sink.Subscribe()

examples/streaming/pub-sub/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func main() {
5656
}
5757

5858
// Don't forget to close the sink when done
59-
defer sink.Close()
59+
defer sink.Close(ctx)
6060

6161
// Read both events
6262
c := sink.Subscribe()

examples/streaming/single-sink/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func main() {
4545
}
4646

4747
// Don't forget to close the sink when done
48-
defer sink.Close()
48+
defer sink.Close(ctx)
4949

5050
// Consume event
5151
ev := <-sink.Subscribe()

0 commit comments

Comments
 (0)