Skip to content

Commit 893fe9f

Browse files
committed
gofumpt
1 parent 2635a90 commit 893fe9f

File tree

9 files changed

+71
-51
lines changed

9 files changed

+71
-51
lines changed

cmd/root.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ var (
4343
versionCmd = &cobra.Command{}
4444
)
4545

46-
var metricTags []string
47-
var amqpAppProperties []string
48-
var amqpAppPropertyFilters []string
49-
var amqpPropertyFilters []string
46+
var (
47+
metricTags []string
48+
amqpAppProperties []string
49+
amqpAppPropertyFilters []string
50+
amqpPropertyFilters []string
51+
)
5052

5153
func Execute() {
5254
rootCmd := RootCmd()
@@ -457,8 +459,8 @@ func start_metrics(cfg config.Config) {
457459
metrics.RegisterCommandLineMetric(cfg, cfg.MetricTags)
458460
metricsServer := metrics.GetMetricsServer()
459461
metricsServer.Start()
460-
461462
}
463+
462464
func startConsumers(ctx context.Context, consumerProto config.Protocol, wg *sync.WaitGroup) {
463465
for i := 1; i <= cfg.Consumers; i++ {
464466
select {

main_test.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ var _ = Describe("OMQ CLI", func() {
3232
func(publishProto string, publishToPrefix string, consumeProto string, consumeFromPrefix string) {
3333
publishTo := publishToPrefix + publishProto + consumeProto
3434
consumeFrom := consumeFromPrefix + publishProto + consumeProto
35-
args := []string{publishProto + "-" + consumeProto,
35+
args := []string{
36+
publishProto + "-" + consumeProto,
3637
"--pmessages=1",
3738
"--cmessages=1",
3839
"-t", publishTo,
@@ -67,7 +68,8 @@ var _ = Describe("OMQ CLI", func() {
6768
func(publishProto string, publishToPrefix string, consumeProto string, consumeFromPrefix string) {
6869
publishTo := publishToPrefix + publishProto + consumeProto
6970
consumeFrom := consumeFromPrefix + publishProto + consumeProto
70-
args := []string{publishProto + "-" + consumeProto,
71+
args := []string{
72+
publishProto + "-" + consumeProto,
7173
"-C", "1",
7274
"-D", "1",
7375
"-t", publishTo,
@@ -96,7 +98,8 @@ var _ = Describe("OMQ CLI", func() {
9698

9799
Describe("supports AMQP Stream Application Property Filters", func() {
98100
It("should filter messages based on app properties", func() {
99-
args := []string{"amqp",
101+
args := []string{
102+
"amqp",
100103
"--pmessages=6",
101104
"--publish-to=/queues/stream-with-app-property-filters",
102105
"--consume-from=/queues/stream-with-app-property-filters",
@@ -116,7 +119,8 @@ var _ = Describe("OMQ CLI", func() {
116119

117120
Describe("supports AMQP Stream Property Filters", func() {
118121
It("should filter messages based on app properties", func() {
119-
args := []string{"amqp",
122+
args := []string{
123+
"amqp",
120124
"--pmessages=3",
121125
"--publish-to=/queues/stream-with-property-filters",
122126
"--consume-from=/queues/stream-with-property-filters",
@@ -136,7 +140,8 @@ var _ = Describe("OMQ CLI", func() {
136140

137141
Describe("supports Fan-In from MQTT to AMQP", func() {
138142
It("should fan-in messages from MQTT to AMQP", func() {
139-
args := []string{"mqtt-amqp",
143+
args := []string{
144+
"mqtt-amqp",
140145
"--publishers=3",
141146
"--consumers=1",
142147
"--pmessages=5",
@@ -155,35 +160,43 @@ var _ = Describe("OMQ CLI", func() {
155160
})
156161
})
157162

158-
Describe("supports --consumer-startup-delay", func() {
163+
FDescribe("supports --consumer-startup-delay", func() {
159164
It("should start consumers after the configured delay", func() {
160-
args := []string{"amqp",
165+
args := []string{
166+
"amqp",
161167
"-C", "1",
162168
"-D", "1",
163169
"--consumer-startup-delay=3s",
164170
"-t", "/queues/consumer-startup-delay",
165171
"-T", "/queues/consumer-startup-delay",
166172
"--queues", "classic",
167173
"--cleanup-queues=true",
168-
"--print-all-metrics"}
174+
"--print-all-metrics",
175+
}
169176

170177
session := omq(args)
171178
Eventually(session).WithTimeout(5 * time.Second).Should(gexec.Exit(0))
172-
Expect(metricValue(session.Out, `omq_end_to_end_latency_seconds{quantile="0.99"}`)).Should(BeNumerically(">", 2))
179+
output, _ := io.ReadAll(session.Out)
180+
buf := bytes.NewReader(output)
181+
Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should(Equal(1.0))
182+
buf.Reset(output)
183+
Expect(metricValue(buf, `omq_end_to_end_latency_seconds{quantile="0.99"}`)).Should(BeNumerically(">", 2))
173184
})
174185
})
175186

176187
Describe("supports `--max-in-flight` in AMQP", func() {
177188
It("Higher --max-in-flight value should lead to higher publishing rate", func() {
178189
publishWithMaxInFlight := func(maxInFlight string) *gexec.Session {
179-
args := []string{"amqp",
190+
args := []string{
191+
"amqp",
180192
"-z", "3s",
181193
"-t", "/queues/amqp-max-in-flight",
182194
"-T", "/queues/amqp-max-in-flight",
183195
"--queues", "stream",
184196
"--cleanup-queues=true",
185197
"--print-all-metrics",
186-
"--max-in-flight", maxInFlight}
198+
"--max-in-flight", maxInFlight,
199+
}
187200

188201
session := omq(args)
189202
Eventually(session).WithTimeout(5 * time.Second).Should(gexec.Exit(0))
@@ -207,7 +220,8 @@ var _ = Describe("OMQ CLI", func() {
207220
func(versionFlag string, connectionVersion string) {
208221
rmqc, err := rabbithole.NewClient("http://127.0.0.1:15672", "guest", "guest")
209222
Expect(err).ShouldNot(HaveOccurred())
210-
args := []string{"mqtt",
223+
args := []string{
224+
"mqtt",
211225
"--time=6s",
212226
"--publish-to=/topic/foo",
213227
"--consume-from=/topic/foo",
@@ -246,7 +260,8 @@ var _ = Describe("OMQ CLI", func() {
246260

247261
Describe("declares queues for AMQP and STOMP clients", func() {
248262
It("declares queues for AMQP consumers with /queues/ address", func() {
249-
args := []string{"amqp",
263+
args := []string{
264+
"amqp",
250265
"-y", "2",
251266
"-x", "0",
252267
"-T", "/queues/declare-without-publishers-%d",
@@ -277,7 +292,8 @@ var _ = Describe("OMQ CLI", func() {
277292
})
278293

279294
It("declares queues for AMQP publishers with /queues/... address", func() {
280-
args := []string{"amqp",
295+
args := []string{
296+
"amqp",
281297
"-y", "0",
282298
"-r", "1",
283299
"-t", "/queues/declare-without-consumers",
@@ -304,7 +320,8 @@ var _ = Describe("OMQ CLI", func() {
304320
})
305321

306322
It("declares queues for STOMP publishers and consumers with /amq/queue/... addresses", func() {
307-
args := []string{"stomp",
323+
args := []string{
324+
"stomp",
308325
"-r", "1",
309326
"-t", "/amq/queue/stomp-declare-for-publisher",
310327
"-T", "/amq/queue/stomp-declare-for-consumer",
@@ -337,7 +354,8 @@ var _ = Describe("OMQ CLI", func() {
337354

338355
Describe("exposes command line flags as a omq_args metric", func() {
339356
It("should print omq_args", func() {
340-
args := []string{"amqp",
357+
args := []string{
358+
"amqp",
341359
"-t", "/queues/omq-args",
342360
"-T", "/queues/omq-args",
343361
"-C", "0",

omq_suite_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ func TestOmq(t *testing.T) {
1313
RunSpecs(t, "OMQ Suite")
1414
}
1515

16-
var omqPath string
17-
var _ = BeforeSuite(func() {
18-
var err error
19-
omqPath, err = gexec.Build("github.com/rabbitmq/omq")
20-
Expect(err).NotTo(HaveOccurred())
21-
DeferCleanup(gexec.CleanupBuildArtifacts)
22-
})
16+
var (
17+
omqPath string
18+
_ = BeforeSuite(func() {
19+
var err error
20+
omqPath, err = gexec.Build("github.com/rabbitmq/omq")
21+
Expect(err).NotTo(HaveOccurred())
22+
DeferCleanup(gexec.CleanupBuildArtifacts)
23+
})
24+
)

pkg/amqp10_client/consumer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,15 +282,17 @@ func buildLinkFilters(cfg config.Config) []amqp.LinkFilter {
282282
filters = append(filters, amqp.NewLinkFilter("amqp:application-properties-filter",
283283
0,
284284
map[string]any{
285-
appProperty: filterExpression}))
285+
appProperty: filterExpression,
286+
}))
286287
}
287288

288289
for property, filterExpression := range cfg.Amqp.PropertyFilters {
289290
filters = append(filters,
290291
amqp.NewLinkFilter("amqp:properties-filter",
291292
0,
292293
map[amqp.Symbol]any{
293-
amqp.Symbol(property): filterExpression}))
294+
amqp.Symbol(property): filterExpression,
295+
}))
294296
}
295297
return filters
296298
}

pkg/amqp10_client/publisher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (p *Amqp10Publisher) Connect() {
9090
return
9191
}
9292
} else {
93-
log.Debug("connection established", "id", p.Id, "uri", uri)
93+
log.Debug("publisher connected", "id", p.Id, "uri", uri)
9494
p.Connection = conn
9595
}
9696
}
@@ -129,7 +129,8 @@ func (p *Amqp10Publisher) CreateSender() {
129129
for p.Sender == nil {
130130
sender, err := p.Session.NewSender(context.TODO(), p.Terminus, &amqp.SenderOptions{
131131
SettlementMode: settleMode,
132-
TargetDurability: durability})
132+
TargetDurability: durability,
133+
})
133134
if err != nil {
134135
log.Error("publisher failed to create a sender", "id", p.Id, "error", err.Error())
135136
time.Sleep(1 * time.Second)

pkg/metrics/metrics.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,11 @@ func GetMetricsServer() *MetricsServer {
121121
vmetrics.WritePrometheus(w, true)
122122
})
123123

124-
metricsServer =
125-
&MetricsServer{
126-
httpServer: &http.Server{
127-
Addr: get_metrics_ip() + ":8080",
128-
},
129-
}
124+
metricsServer = &MetricsServer{
125+
httpServer: &http.Server{
126+
Addr: get_metrics_ip() + ":8080",
127+
},
128+
}
130129
}
131130

132131
return metricsServer
@@ -155,8 +154,10 @@ func (m *MetricsServer) Start() {
155154
}()
156155
}
157156

158-
var previouslyPublished uint64
159-
var previouslyConsumed uint64
157+
var (
158+
previouslyPublished uint64
159+
previouslyConsumed uint64
160+
)
160161

161162
func (m *MetricsServer) PrintMessageRates(ctx context.Context) {
162163
go func() {
@@ -176,7 +177,6 @@ func (m *MetricsServer) PrintMessageRates(ctx context.Context) {
176177
previouslyConsumed = consumed
177178

178179
}
179-
180180
}
181181
}()
182182
}

pkg/mgmt/mgmt.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ import (
1313
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
1414
)
1515

16-
var mgmtConn *rmq.IConnection
17-
var declaredQueues []string
18-
var mgmtUri string
16+
var (
17+
mgmtConn *rmq.IConnection
18+
declaredQueues []string
19+
mgmtUri string
20+
)
1921

2022
func Get() rmq.IManagement {
2123
var conn rmq.IConnection

pkg/mqtt_client/common.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ type Publisher interface {
2323
}
2424

2525
func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer {
26-
2726
topic := utils.InjectId(cfg.ConsumeFrom, id)
2827
topic = strings.TrimPrefix(topic, "/exchange/amq.topic/")
2928
topic = strings.TrimPrefix(topic, "/topic/")
@@ -43,7 +42,6 @@ func NewConsumer(ctx context.Context, cfg config.Config, id int) Consumer {
4342
Config: cfg,
4443
}
4544
}
46-
4745
}
4846

4947
func NewPublisher(ctx context.Context, cfg config.Config, id int) Publisher {
@@ -71,7 +69,6 @@ func NewPublisher(ctx context.Context, cfg config.Config, id int) Publisher {
7169
Config: cfg,
7270
}
7371
}
74-
7572
}
7673

7774
func newMqtt34Connection(cfg config.Config, id int) mqtt.Client {

pkg/stomp_client/consumer.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ type StompConsumer struct {
2424
}
2525

2626
func NewConsumer(ctx context.Context, cfg config.Config, id int) *StompConsumer {
27-
2827
consumer := &StompConsumer{
2928
Id: id,
3029
Connection: nil,
@@ -76,7 +75,6 @@ func (c *StompConsumer) Connect() {
7675
c.Connection = conn
7776
}
7877
}
79-
8078
}
8179

8280
func (c *StompConsumer) Subscribe() {
@@ -132,7 +130,6 @@ func (c *StompConsumer) Start(ctx context.Context, subscribed chan bool) {
132130
err := c.Connection.Ack(msg)
133131
if err != nil {
134132
log.Error("message NOT acknowledged", "id", c.Id, "destination", c.Topic)
135-
136133
} else {
137134
metrics.MessagesConsumedMetric(priority).Inc()
138135
i++
@@ -147,7 +144,6 @@ func (c *StompConsumer) Start(ctx context.Context, subscribed chan bool) {
147144

148145
c.Stop("--cmessages value reached")
149146
log.Debug("consumer finished", "id", c.Id)
150-
151147
}
152148

153149
func (c *StompConsumer) Stop(reason string) {

0 commit comments

Comments
 (0)