File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change @@ -91,11 +91,12 @@ func publish(sessions chan chan session, messages <-chan message) {
91
91
ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
92
92
defer cancel ()
93
93
94
+ pending := make (chan message , 1 )
95
+
94
96
for session := range sessions {
95
97
var (
96
98
running bool
97
99
reading = messages
98
- pending = make (chan message , 1 )
99
100
confirm = make (chan amqp.Confirmation , 1 )
100
101
)
101
102
@@ -117,6 +118,7 @@ func publish(sessions chan chan session, messages <-chan message) {
117
118
select {
118
119
case confirmed , ok := <- confirm :
119
120
if ! ok {
121
+ pub .Close ()
120
122
break Publish
121
123
}
122
124
if ! confirmed .Ack {
@@ -190,6 +192,7 @@ func subscribe(sessions chan chan session, messages chan<- message) {
190
192
messages <- msg .Body
191
193
sub .Ack (msg .DeliveryTag , false )
192
194
}
195
+ sub .Close ()
193
196
}
194
197
}
195
198
You can’t perform that action at this time.
0 commit comments