Skip to content

consumer: sendRDY error not propogating #199

@armcknight

Description

@armcknight

Hello,

first of all apologies if my terminology is a bit off, I'm not a regular Go programmer :)

We run a process reading from NSQ servers over an SSH tunnel. While debugging an issue when this connection breaks, we found a potential problem with how an error from sendRDY will not fully propagate.

sendRDY possibly emits an error (

go-nsq/consumer.go

Lines 950 to 964 in d71fb89

func (r *Consumer) sendRDY(c *Conn, count int64) error {
if count == 0 && c.LastRDY() == 0 {
// no need to send. It's already that RDY count
return nil
}
atomic.AddInt64(&r.totalRdyCount, -c.RDY()+count)
c.SetRDY(count)
err := c.WriteCommand(Ready(int(count)))
if err != nil {
r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err)
return err
}
return nil
}
):

func (r *Consumer) sendRDY(c *Conn, count int64) error

updateRDY, which calls sendRDY, also possibly emits an error (

func (r *Consumer) updateRDY(c *Conn, count int64) error {
):

func (r *Consumer) updateRDY(c *Conn, count int64) error

But that error isn't handled in it's own recursive call here (

r.updateRDY(c, count)
):

r.rdyRetryTimers[c.String()] = time.AfterFunc(5*time.Second,
	func() {
		r.updateRDY(c, count)
	})

We were thinking that the failure for the error to fully propagate means our process doesn't pick up the loss of connection and doesn't know to attempt a mitigation.

We also found a few other invocations of updateRDY that don't appear to handle errors, which both appear in startStopContinueBackoff , which doesn't report that it can throw an error (

func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) {
):

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions