Description
It is easy to spot this bug if the following 2 criteria are fulfilled.
- The rabbitmq server is not stable. It goes down for many times a day.
- The event handling on the subscriber/deadLetterSink side is slow. (The duration from the dispatcher sending out the event to the subscriber/deadLetterSink sending back the response is long)
(Luckily, or unluckily, my testing environment meets both of them.)
Describe the bug
In the beginning, I noticed an issue of the dispatcher. Every time when the rabbitmq server goes down, and stays down for a short time, then comes back up , the dispatcher can't restart the consuming. It can reconnect to the rabbitmq server, but just can't restart the consuming. If check on the rabbitmq server side (like the management UI), will see there is no consumer for the queue.
This issue kept happening in my test env, which drove me to investigate it.
With many debugging logs added, I narrowed down the suspicious code, which is in the file pkg/dispatcher/dispatcher.go
.
connNotifyChannel, chNotifyChannel := conn.NotifyClose(make(chan *amqp.Error)), channel.NotifyClose(make(chan *amqp.Error))
for {
select {
case <-ctx.Done():
logging.FromContext(ctx).Info("context done, stopping message consumers")
finishConsuming(wg, workerQueue)
return ctx.Err()
case <-connNotifyChannel:
finishConsuming(wg, workerQueue)
return amqp.ErrClosed
case <-chNotifyChannel:
finishConsuming(wg, workerQueue)
return amqp.ErrClosed
case msg, ok := <-msgs:
if !ok {
finishConsuming(wg, workerQueue)
return amqp.ErrClosed
}
workerQueue <- msg
}
}
- After the rabbitmq server shutdown, the notification channel registered with the function
Connection.NotifyClose()
orChannel.NotifyClose()
will become readable; - And that would make the
finishConsuming()
get called, and in turn,wg.Wait()
would be called to wait for the dispatch goroutine to finish; - And the problem is right here. The dispatch goroutine would be blocking. The function
dispatch()
which is called in the goroutine would get stuck without returning. - With further debugging, the function
dispatch()
is found to be stuck at the call to themsg.Ack()
- At that moment, the current connection to the rabbitmq server is already lost, (sooner or later, another goroutine
watchRabbitMQConnections()
would create a new connection), according to the doc ofamqp091
, the call tomsg.Ack()
is supposed to return an error to indicate the connection is lost, rather than blocking there.
(What makes the msg.Ack()
block?)
Expected behavior
Every time when the rabbitmq server goes down, and stays down for no matter how long, then comes back up , the dispatcher can restart the consuming.
To Reproduce
- First, create a temp slow event handler (subscriber/deadLetterSink), like below
@api.post("/")
def event_handler():
print("-=start=-")
sleep(120)
return 200, ""
- Trigger the event. The event goes through the rabbitmq queue and the dispatcher, finally gets to the slow event handler.
- Immediately shutdown the rabbitmq server once we see the "-=start=-" printed out.
- Keep the rabbitmq server down for a couple of seconds. Make sure the dispatcher has detected the sever is down -- the
NotifyClose
channel becomes readable, the functionfinishConsuming()
is called. - Get the rabbitmq server back up, we will see no more error logs of the dispatcher about the attempts to reconnect to the rabbitmq server, which indicates that the dispatcher has established a new connection to the server. But on the rabbitmq server side, we will see no consumer for the queue. If we re-trigger the event, it won't be consumed, won't be sent to the event handler.
Knative release version
From 1.10.0 to 1.13.0. (I didn't test the older version. I think they might be the same.)
Additional context
In the end, I managed to figure out the root cause, which is a deadlock that blocks the msg.Ack()
. The deadlock occurs this way.
In the dispatcher, in the dispatch goroutine
- The dispatcher stops at the call to
ceClient.Request()
The RabbitMQ server, the remote peer, shuts down
In the
amqp091
library
- The connection-lost event would be detected by the
amqp091
package first. TheConnection.shutdown()
will be called. (The caller isConnection.send()
)Connection.shutdown()
will send the error to theConnection.NotifyClose
channel to notify the consumer (which is the dispatcher in this case) that the connection to the remote server is closed
In the dispatcher, in the main goroutine, the piece of code shown above is triggered.
- The dispatcher receives the notification from the
Connection.NotifyClose
channel- It will call
finishConsuming()
- which will, in turn, call
wg.Wait()
to wait for all the dispatch goroutine to callwg.Done()
. (But right now, one of the dispatch goroutine stops at the call toceClient.Request()
)
In the
amqp091
library,
- The
Connection.shutdown()
will continue to call theChannel.shutdown()
Channel.shutdown()
will first call thech.m.Lock()
, then will do the similar thing -- send the error to theChannel.NotifyClose
channel to notify the consumer (which is the dispatcher in this case) that the RabbitMQ Channel is closed.
The first deadlock occurs here.
** Sending the error to the Channel.NotifyClose
channel would block **. Because there is no receiver on the Channel.NotifyClose
channel.
Why is there no receiver on the Channel.NotifyClose
? Also see the piece of code above. As we can see, the receiver of the Channel.NotifyClose
channel is this one, case <-chNotifyChannel:
. It will never be reached, because another receiver, case <- connNotifyChannel
, has been executed, and as the result, the process is either blocked in the finishConsuming()
or returned from the current function.
(BTW., This deadlock blocks one of the sub goroutine that is calling the "shutdown" function. Because it doesn't block the main goroutine, so it doesn't look like that harmful.)
The final consequence for this part is
- The
Channel.shutdown()
couldn't return, because the deadlock - The mutex
ch.m.Lock()
couldn't be unlocked.
Back to the dispatcher, in the dispatch goroutine
- The response of
ceClient.Request()
is finally received, themsg.Ack()
would be calledmsg.Ack()
would eventually callch.m.Lock()
to lock the same mutex, which is being locked right now
So obviously, this is the second deadlock.
To sum up, the whole deadlock thing is like this
- The
amqp091
library sends the error to the unbuffered channel registered byConnection.NotifyClose()
andChannel.NotifyClose()
- Once the dispatcher receives the error from the
Connection.NotifyClose
channel, it won't read theChannel.NotifyClose
channel any longer, that blocks the sender,Channel.shutdown()
- The
Channel.shutdown()
being blocked makes the mutexch.m.Lock()
stay locked - The mutex
ch.m.Lock()
staying locked blocks themsg.Ack()
- The
msg.Ack()
blocks the dispatch goroutine, so thewg.Done()
will never be called. - That blocks the function
finishConsuming()
at the call towg.Wait()
. - The function
finishConsuming()
, in turn, blocks its caller, theConsumeFromQueue()
, that eventually results in the dispatcher being unable to restart the consuming
By referring to the doc of amqp091
, I believe the deadlock is out of a kind of improper use of the NotifyClose
channel.
Best practises for Connection and Channel notifications
So I have come up with my own fix. A PR will be created soon.