Skip to content

Commit 2635a90

Browse files
committed
More contexts...
There were cases where ^C wouldn't stop omq or it'd take a long time to stop. Hopefully most of them fixed now
1 parent 400d515 commit 2635a90

File tree

8 files changed

+97
-73
lines changed

8 files changed

+97
-73
lines changed

cmd/root.go

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -341,36 +341,32 @@ func start(cfg config.Config) {
341341
}()
342342

343343
var wg sync.WaitGroup
344-
wg.Add(cfg.Publishers + cfg.Consumers)
345344

346345
// TODO
347346
// refactor; make consumer startup delay more accurate
348347
// clarfiy when queues are declared
349348

350349
mgmt.DeclareQueues(cfg)
351-
// if --consumer-startup-delay is not set, we want to start
352-
// all the consumers before we start any publishers
353-
if cfg.ConsumerStartupDelay == 0 {
354-
startConsumers(ctx, cfg.ConsumerProto, &wg)
355-
} else {
356-
go func() {
357-
time.Sleep(cfg.ConsumerStartupDelay)
358-
startConsumers(ctx, cfg.ConsumerProto, &wg)
359-
}()
360-
}
350+
startConsumers(ctx, cfg.ConsumerProto, &wg)
361351

362352
if cfg.Publishers > 0 {
363353
for i := 1; i <= cfg.Publishers; i++ {
364-
n := i
365-
go func() {
366-
defer wg.Done()
367-
p, err := common.NewPublisher(cfg.PublisherProto, cfg, n)
368-
if err != nil {
369-
log.Error("Error creating publisher: ", "error", err)
370-
os.Exit(1)
371-
}
372-
p.Start(ctx)
373-
}()
354+
select {
355+
case <-ctx.Done():
356+
return
357+
default:
358+
n := i
359+
wg.Add(1)
360+
go func() {
361+
defer wg.Done()
362+
p, err := common.NewPublisher(ctx, cfg.PublisherProto, cfg, n)
363+
if err != nil {
364+
log.Error("Error creating publisher: ", "error", err)
365+
os.Exit(1)
366+
}
367+
p.Start(ctx)
368+
}()
369+
}
374370
}
375371
}
376372

@@ -465,20 +461,29 @@ func start_metrics(cfg config.Config) {
465461
}
466462
func startConsumers(ctx context.Context, consumerProto config.Protocol, wg *sync.WaitGroup) {
467463
for i := 1; i <= cfg.Consumers; i++ {
468-
subscribed := make(chan bool)
469-
n := i
470-
go func() {
471-
defer wg.Done()
472-
c, err := common.NewConsumer(consumerProto, cfg, n)
473-
if err != nil {
474-
log.Error("Error creating consumer: ", "error", err)
475-
os.Exit(1)
464+
select {
465+
case <-ctx.Done():
466+
return
467+
default:
468+
subscribed := make(chan bool)
469+
n := i
470+
wg.Add(1)
471+
go func() {
472+
defer wg.Done()
473+
time.Sleep(cfg.ConsumerStartupDelay)
474+
c, err := common.NewConsumer(ctx, consumerProto, cfg, n)
475+
if err != nil {
476+
log.Error("Error creating consumer: ", "error", err)
477+
os.Exit(1)
478+
}
479+
c.Start(ctx, subscribed)
480+
}()
481+
// consumers are started one by one and synchronously,
482+
// unless a startup delay is set - then we just fire them and hope for the best
483+
if cfg.ConsumerStartupDelay == 0 {
484+
<-subscribed
476485
}
477-
c.Start(ctx, subscribed)
478-
}()
479-
480-
// wait until we know the receiver has subscribed
481-
<-subscribed
486+
}
482487
}
483488
}
484489

pkg/amqp10_client/consumer.go

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ type Amqp10Consumer struct {
2727
Terminus string
2828
Config config.Config
2929
whichUri int
30+
ctx context.Context
3031
}
3132

32-
func NewConsumer(cfg config.Config, id int) *Amqp10Consumer {
33+
func NewConsumer(ctx context.Context, cfg config.Config, id int) *Amqp10Consumer {
3334
consumer := &Amqp10Consumer{
3435
Id: id,
3536
Connection: nil,
@@ -38,14 +39,15 @@ func NewConsumer(cfg config.Config, id int) *Amqp10Consumer {
3839
Terminus: utils.InjectId(cfg.ConsumeFrom, id),
3940
Config: cfg,
4041
whichUri: 0,
42+
ctx: ctx,
4143
}
4244

4345
if cfg.SpreadConnections {
4446
consumer.whichUri = (id - 1) % len(cfg.ConsumerUri)
4547
}
4648

4749
// TODO: context?
48-
consumer.Connect(context.TODO())
50+
consumer.Connect(ctx)
4951

5052
return consumer
5153
}
@@ -81,7 +83,12 @@ func (c *Amqp10Consumer) Connect(ctx context.Context) {
8183
})
8284
if err != nil {
8385
log.Error("consumer failed to connect", "id", c.Id, "error", err.Error())
84-
time.Sleep(1 * time.Second)
86+
select {
87+
case <-ctx.Done():
88+
return
89+
case <-time.After(1 * time.Second):
90+
continue
91+
}
8592
} else {
8693
log.Debug("consumer connected", "id", c.Id, "uri", uri)
8794
c.Connection = conn
@@ -110,23 +117,28 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) {
110117
durability = amqp.DurabilityUnsettledState
111118
}
112119

113-
for c.Receiver == nil {
114-
receiver, err := c.Session.NewReceiver(ctx,
115-
c.Terminus,
116-
&amqp.ReceiverOptions{
117-
SourceDurability: durability,
118-
Credit: int32(c.Config.ConsumerCredits),
119-
Properties: buildLinkProperties(c.Config),
120-
Filters: buildLinkFilters(c.Config),
121-
})
122-
if err != nil {
123-
if err == context.Canceled {
124-
return
120+
for c.Receiver == nil && c.Session != nil {
121+
select {
122+
case <-ctx.Done():
123+
return
124+
default:
125+
receiver, err := c.Session.NewReceiver(context.TODO(),
126+
c.Terminus,
127+
&amqp.ReceiverOptions{
128+
SourceDurability: durability,
129+
Credit: int32(c.Config.ConsumerCredits),
130+
Properties: buildLinkProperties(c.Config),
131+
Filters: buildLinkFilters(c.Config),
132+
})
133+
if err != nil {
134+
if err == context.Canceled {
135+
return
136+
}
137+
log.Error("consumer failed to create a receiver", "id", c.Id, "error", err.Error())
138+
time.Sleep(1 * time.Second)
139+
} else {
140+
c.Receiver = receiver
125141
}
126-
log.Error("consumer failed to create a receiver", "id", c.Id, "error", err.Error())
127-
time.Sleep(1 * time.Second)
128-
} else {
129-
c.Receiver = receiver
130142
}
131143
}
132144
}
@@ -232,10 +244,11 @@ func pastTense(outcome string) string {
232244
}
233245

234246
func (c *Amqp10Consumer) Stop(reason string) {
235-
_ = c.Session.Close(context.Background())
236-
err := c.Connection.Close()
237-
if err != nil {
238-
log.Info("consumer stopped with an error", "id", c.Id, "error", err.Error())
247+
if c.Session != nil {
248+
_ = c.Session.Close(context.Background())
249+
}
250+
if c.Connection != nil {
251+
_ = c.Connection.Close()
239252
}
240253
log.Debug("consumer stopped", "id", c.Id, "reason", reason)
241254
}

pkg/amqp10_client/publisher.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,18 @@ type Amqp10Publisher struct {
2727
Config config.Config
2828
msg []byte
2929
whichUri int
30+
ctx context.Context
3031
}
3132

32-
func NewPublisher(cfg config.Config, id int) *Amqp10Publisher {
33+
func NewPublisher(ctx context.Context, cfg config.Config, id int) *Amqp10Publisher {
3334
publisher := &Amqp10Publisher{
3435
Id: id,
3536
Connection: nil,
3637
Sender: nil,
3738
Config: cfg,
3839
Terminus: utils.InjectId(cfg.PublishTo, id),
3940
whichUri: 0,
41+
ctx: ctx,
4042
}
4143

4244
if cfg.SpreadConnections {
@@ -81,7 +83,12 @@ func (p *Amqp10Publisher) Connect() {
8183

8284
if err != nil {
8385
log.Error("connection failed", "id", p.Id, "error", err.Error())
84-
time.Sleep(1 * time.Second)
86+
select {
87+
case <-time.After(1 * time.Second):
88+
continue
89+
case <-p.ctx.Done():
90+
return
91+
}
8592
} else {
8693
log.Debug("connection established", "id", p.Id, "uri", uri)
8794
p.Connection = conn

pkg/common/common.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@ type Consumer interface {
1818
Start(context.Context, chan bool)
1919
}
2020

21-
func NewPublisher(protocol config.Protocol, cfg config.Config, id int) (Publisher, error) {
21+
func NewPublisher(ctx context.Context, protocol config.Protocol, cfg config.Config, id int) (Publisher, error) {
2222
switch protocol {
2323
case config.AMQP:
24-
p := amqp10_client.NewPublisher(cfg, id)
24+
p := amqp10_client.NewPublisher(ctx, cfg, id)
2525
if p == nil {
2626
return nil, fmt.Errorf("failed to create an AMQP-1.0 publisher")
2727
}
2828
return p, nil
2929
case config.STOMP:
30-
p := stomp_client.NewPublisher(cfg, id)
30+
p := stomp_client.NewPublisher(ctx, cfg, id)
3131
if p == nil {
3232
return nil, fmt.Errorf("failed to create a STOMP publisher")
3333
}
3434
return p, nil
3535
case config.MQTT:
36-
p := mqtt_client.NewPublisher(cfg, id)
36+
p := mqtt_client.NewPublisher(ctx, cfg, id)
3737
if p == nil {
3838
return nil, fmt.Errorf("failed to create an MQTT publisher")
3939
}
@@ -43,22 +43,22 @@ func NewPublisher(protocol config.Protocol, cfg config.Config, id int) (Publishe
4343
return nil, fmt.Errorf("unknown protocol")
4444
}
4545

46-
func NewConsumer(protocol config.Protocol, cfg config.Config, id int) (Consumer, error) {
46+
func NewConsumer(ctx context.Context, protocol config.Protocol, cfg config.Config, id int) (Consumer, error) {
4747
switch protocol {
4848
case config.AMQP:
49-
c := amqp10_client.NewConsumer(cfg, id)
49+
c := amqp10_client.NewConsumer(ctx, cfg, id)
5050
if c == nil {
5151
return nil, fmt.Errorf("failed to create an AMQP-1.0 consumer")
5252
}
5353
return c, nil
5454
case config.STOMP:
55-
c := stomp_client.NewConsumer(cfg, id)
55+
c := stomp_client.NewConsumer(ctx, cfg, id)
5656
if c == nil {
5757
return nil, fmt.Errorf("failed to create a STOMP consumer")
5858
}
5959
return c, nil
6060
case config.MQTT:
61-
c := mqtt_client.NewConsumer(cfg, id)
61+
c := mqtt_client.NewConsumer(ctx, cfg, id)
6262
if c == nil {
6363
return nil, fmt.Errorf("failed to create an MQTT consumer")
6464
}

pkg/mqtt_client/common.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type Publisher interface {
2222
Start(context.Context)
2323
}
2424

25-
func NewConsumer(cfg config.Config, id int) Consumer {
25+
func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer {
2626

2727
topic := utils.InjectId(cfg.ConsumeFrom, id)
2828
topic = strings.TrimPrefix(topic, "/exchange/amq.topic/")
@@ -46,7 +46,7 @@ func NewConsumer(cfg config.Config, id int) Consumer {
4646

4747
}
4848

49-
func NewPublisher(cfg config.Config, id int) Publisher {
49+
func NewPublisher(ctx context.Context, cfg config.Config, id int) Publisher {
5050
topic := utils.InjectId(cfg.PublishTo, id)
5151
// AMQP-1.0 and STOMP allow /exchange/amq.topic/ prefix
5252
// since MQTT has no concept of exchanges, we need to remove it

pkg/stomp_client/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type StompConsumer struct {
2323
whichUri int
2424
}
2525

26-
func NewConsumer(cfg config.Config, id int) *StompConsumer {
26+
func NewConsumer(ctx context.Context, cfg config.Config, id int) *StompConsumer {
2727

2828
consumer := &StompConsumer{
2929
Id: id,

pkg/stomp_client/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type StompPublisher struct {
2424
whichUri int
2525
}
2626

27-
func NewPublisher(cfg config.Config, id int) *StompPublisher {
27+
func NewPublisher(ctx context.Context, cfg config.Config, id int) *StompPublisher {
2828
publisher := &StompPublisher{
2929
Id: id,
3030
Connection: nil,

pkg/utils/kubernetes.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func GetEndpoints(serviceName string) ([]string, error) {
3131
log.Error("Can't read the Kubernetes token", "error", err.Error())
3232
os.Exit(1)
3333
}
34-
var bearer = "Bearer " + string(token)
34+
bearer := "Bearer " + string(token)
3535

3636
req, err := http.NewRequest("GET", url, nil)
3737
if err != nil {
@@ -56,7 +56,6 @@ func GetEndpoints(serviceName string) ([]string, error) {
5656

5757
client := &http.Client{Transport: tr}
5858
resp, err := client.Do(req)
59-
6059
if err != nil {
6160
log.Error("Can't connect to the Kubernetes API", "error", err.Error())
6261
os.Exit(1)

0 commit comments

Comments
 (0)