Skip to content

Commit 97cf3ec

Browse files
committed
Handle context cancellation in AMQP handleErrors
1 parent 21615d0 commit 97cf3ec

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

pkg/amqp10/publisher.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func (p *Amqp10Publisher) SendAsync(ctx context.Context) error {
229229
case <-ctx.Done():
230230
return nil
231231
default:
232-
err = p.handleSendErrors(err)
232+
err = p.handleSendErrors(ctx, err)
233233
if err != nil {
234234
return err
235235
}
@@ -249,7 +249,7 @@ func (p *Amqp10Publisher) SendSync(ctx context.Context) error {
249249
err := p.Sender.Send(context.TODO(), msg, nil)
250250
latency := time.Since(startTime)
251251
log.Debug("message sent", "id", p.Id, "destination", p.Terminus, "latency", latency, "appProps", msg.ApplicationProperties)
252-
err = p.handleSendErrors(err)
252+
err = p.handleSendErrors(ctx, err)
253253
if err != nil {
254254
return err
255255
}
@@ -262,19 +262,29 @@ func (p *Amqp10Publisher) SendSync(ctx context.Context) error {
262262
// handleSendErrors returns an error if the error suggests we should reconnect
263263
// (this is native, but amqp-go-client should handle this better in the future)
264264
// otherwise we log an error but return nil to keep publishing
265-
func (p *Amqp10Publisher) handleSendErrors(err error) error {
266-
var connErr *amqp.ConnError
267-
var linkErr *amqp.LinkError
268-
if errors.As(err, &connErr) {
269-
log.Error("publisher connection failure; reconnecting...", "id", p.Id, "error", connErr.Error())
270-
return err
271-
} else if errors.As(err, &linkErr) {
272-
log.Error("publisher link failure; reconnecting...", "id", p.Id, "error", connErr.Error())
273-
return err
274-
} else if err != nil {
275-
log.Error("message sending failure", "id", p.Id, "error", err)
265+
func (p *Amqp10Publisher) handleSendErrors(ctx context.Context, err error) error {
266+
select {
267+
case <-ctx.Done():
268+
return nil
269+
default:
270+
var connErr *amqp.ConnError
271+
var linkErr *amqp.LinkError
272+
if errors.As(err, &connErr) {
273+
log.Error("publisher connection failure; reconnecting...", "id", p.Id, "error", connErr.Error())
274+
return err
275+
}
276+
277+
if errors.As(err, &linkErr) {
278+
log.Error("publisher link failure; reconnecting...", "id", p.Id, "error", connErr.Error())
279+
return err
280+
}
281+
282+
if err != nil {
283+
log.Error("message sending failure", "id", p.Id, "error", err)
284+
}
285+
286+
return nil
276287
}
277-
return nil
278288
}
279289

280290
func (p *Amqp10Publisher) handleSent(receipt *amqp.SendReceipt, published time.Time) {

0 commit comments

Comments
 (0)