-
Notifications
You must be signed in to change notification settings - Fork 441
Description
Hello,
first of all apologies if my terminology is a bit off, I'm not a regular Go programmer :)
We run a process reading from NSQ servers over an SSH tunnel. While debugging an issue when this connection breaks, we found a potential problem with how an error from sendRDY will not fully propagate.
sendRDY possibly emits an error (
Lines 950 to 964 in d71fb89
| func (r *Consumer) sendRDY(c *Conn, count int64) error { | |
| if count == 0 && c.LastRDY() == 0 { | |
| // no need to send. It's already that RDY count | |
| return nil | |
| } | |
| atomic.AddInt64(&r.totalRdyCount, -c.RDY()+count) | |
| c.SetRDY(count) | |
| err := c.WriteCommand(Ready(int(count))) | |
| if err != nil { | |
| r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err) | |
| return err | |
| } | |
| return nil | |
| } |
func (r *Consumer) sendRDY(c *Conn, count int64) error
updateRDY, which calls sendRDY, also possibly emits an error (
Line 907 in d71fb89
| func (r *Consumer) updateRDY(c *Conn, count int64) error { |
func (r *Consumer) updateRDY(c *Conn, count int64) error
But that error isn't handled in it's own recursive call here (
Line 940 in d71fb89
| r.updateRDY(c, count) |
r.rdyRetryTimers[c.String()] = time.AfterFunc(5*time.Second,
func() {
r.updateRDY(c, count)
})
We were thinking that the failure for the error to fully propagate means our process doesn't pick up the loss of connection and doesn't know to attempt a mitigation.
We also found a few other invocations of updateRDY that don't appear to handle errors, which both appear in startStopContinueBackoff , which doesn't report that it can throw an error (
Line 761 in d71fb89
| func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) { |