Skip to content

Commit e8429a4

Browse files
authored
AMQP 0.9.1 publisher (#55)
1 parent e732193 commit e8429a4

File tree

16 files changed

+399
-120
lines changed

16 files changed

+399
-120
lines changed

README.md

Lines changed: 11 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
## omq
22

3-
`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0.
4-
It is developed mostly for RabbitMQ but might be useful for other brokers as well (some tests against ActiveMQ
5-
were performed).
3+
`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0
4+
and partially AMQP 0.9.1 (only for publishing). It is developed mostly for RabbitMQ but might be useful for other brokers
5+
as well (some tests against ActiveMQ were performed).
66

77
`omq` starts a group of publishers and a group of consumers, in both cases all publishers/consumers are identical,
88
except for the target terminus/queue/routing key, which may be slightly different. The publishers can use
@@ -65,6 +65,12 @@ For convenience, if either `--publish-to` or `--consume-from` starts with `/exch
6565
will remove that prefix. RabbitMQ only allows using a single topic exchange with MQTT (`amq.topic` by default), so this prefix doesn't make
6666
much sense. Removing it makes it easier to use the same parameters across protocols.
6767

68+
AMQP 0.9.1 publishers use the same target address as AMQP 1.0:
69+
* `/queues/foo` will publish to the default exchange with the routing key `foo`
70+
* `/exchange/bar` will publish to the `bar` exchange with an empty routing key
71+
* `/exchange/bar/baz` will publish to the `bar` exchange with the routing key `baz`
72+
* any `--publish-to` value that doesn't match any of the above formats is treated as a routing key for the default exchange
73+
6874
Read more about how RabbitMQ handles sources and targets in different protocols:
6975
* [AMQP 1.0](https://www.rabbitmq.com/docs/amqp#address-v1) format used by RabbitMQ 3.x
7076
* [AMQP 1.0](https://www.rabbitmq.com/docs/amqp#address-v2) format used by RabbitMQ 4.0+ (the old format is still supported but deprecated)
@@ -127,53 +133,5 @@ messages published with perf-test can be consumed by `omq` or vice versa, and th
127133

128134
### Options
129135

130-
```
131-
--amqp-app-property stringArray AMQP application properties, eg. key1=val1,val2
132-
--amqp-app-property-filter stringArray AMQP application property filters, eg. key1=&p:prefix
133-
--binding-key string AMQP 1.0 consumer binding key
134-
--amqp-property-filter stringArray AMQP property filters, eg. subject=foo
135-
--amqp-reject-rate int Rate of messages to reject (0-100%)
136-
--amqp-release-rate int Rate of messages to release without accepting (0-100%)
137-
--amqp-subject strings AMQP 1.0 message subject(s), eg. foo,bar,baz
138-
--amqp-to strings AMQP 1.0 message To field (required for the anonymous terminus)
139-
--mqtt-consumer-version int MQTT consumer protocol version (3, 4 or 5; default=5) (default 5)
140-
--mqtt-publisher-clean-session MQTT publisher clean session (default true)
141-
--mqtt-publisher-qos int MQTT publisher QoS level (0, 1 or 2; default=0)
142-
--mqtt-publisher-version int MQTT consumer protocol version (3, 4 or 5; default=5) (default 5)
143-
--binding-key string Binding key for queue declarations
144-
--cleanup-queues Delete the queues at the end (omq only deletes the queues it explicitly declared)
145-
-D, --cmessages int The number of messages to consume per consumer (default=MaxInt) (default 9223372036854775807)
146-
-T, --consume-from string The queue/topic/terminus to consume from (%d will be replaced with the consumer's id) (default "/queues/omq-%d")
147-
--consumer-credits int AMQP-1.0 consumer credits / STOMP prefetch count (default 1)
148-
--consumer-id string Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random) (default "omq-consumer-%d")
149-
-L, --consumer-latency duration consumer latency (time to accept message)
150-
--consumer-priority int32 Consumer priority
151-
--consumer-startup-delay duration Delay consumer startup to allow a backlog of messages to build up (eg. 10s)
152-
--consumer-uri strings URI for consuming
153-
-y, --consumers int The number of consumers to start (default 1)
154-
--expected-instances int The number of instances to synchronize (default 1)
155-
--expected-instances-endpoint string The DNS name that will return members to synchronize with
156-
-h, --help help for omq
157-
-l, --log-level log-level Log level (debug, info, error) (default info)
158-
--log-out-of-order-messages Print a log line when a message is received that is older than the previously received message
159-
-c, --max-in-flight int Maximum number of in-flight messages per publisher (default 1)
160-
-d, --message-durability Mark messages as durable (default true)
161-
--message-priority string Message priority (0-255, default=unset)
162-
--message-ttl duration Message TTL (not set by default)
163-
--metric-tags strings Prometheus label-value pairs, eg. l1=v1,l2=v2
164-
-C, --pmessages int The number of messages to send per publisher (default 9223372036854775807)
165-
--print-all-metrics Print all metrics before exiting
166-
-t, --publish-to string The topic/terminus to publish to (%d will be replaced with the publisher's id) (default "/queues/omq-%d")
167-
--publisher-id string Client ID for AMQP and MQTT publishers (%d => consumer's id, %r => random) (default "omq-publisher-%d")
168-
--publisher-uri strings URI for publishing
169-
-x, --publishers int The number of publishers to start (default 1)
170-
--queue-durability queue-durability Queue durability (default: configuration - the queue definition is durable) (default configuration)
171-
--queues predeclared Type of queues to declare (or predeclared to use existing queues) (default predeclared)
172-
-r, --rate float32 Messages per second (-1 = unlimited) (default -1)
173-
-s, --size int Message payload size in bytes (default 12)
174-
--spread-connections Spread connections across URIs (default true)
175-
--stream-offset string Stream consumer offset specification (default=next)
176-
-z, --time duration Run duration (eg. 10s, 5m, 2h)
177-
--uri strings URI for both publishers and consumers
178-
-m, --use-millis Use milliseconds for timestamps (automatically enabled when no publishers or no consumers)
179-
```
136+
Use `omq --help` for the full list of options. Keep in mind that some options are protocol-specific and therefore will only
137+
be printed with the corresponding subcommand. For example `omq mqtt --help` will additionally show MQTT-specific options.

cmd/root.go

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,20 @@ import (
3232

3333
var cfg config.Config
3434
var (
35-
amqp_amqp = &cobra.Command{}
36-
amqp_stomp = &cobra.Command{}
37-
amqp_mqtt = &cobra.Command{}
38-
stomp_stomp = &cobra.Command{}
39-
stomp_amqp = &cobra.Command{}
40-
stomp_mqtt = &cobra.Command{}
41-
mqtt_mqtt = &cobra.Command{}
42-
mqtt_amqp = &cobra.Command{}
43-
mqtt_stomp = &cobra.Command{}
44-
versionCmd = &cobra.Command{}
35+
amqp_amqp = &cobra.Command{}
36+
amqp_stomp = &cobra.Command{}
37+
amqp_mqtt = &cobra.Command{}
38+
stomp_stomp = &cobra.Command{}
39+
stomp_amqp = &cobra.Command{}
40+
stomp_mqtt = &cobra.Command{}
41+
mqtt_mqtt = &cobra.Command{}
42+
mqtt_amqp = &cobra.Command{}
43+
mqtt_stomp = &cobra.Command{}
44+
amqp091_amqp091 = &cobra.Command{}
45+
amqp091_amqp = &cobra.Command{}
46+
amqp091_mqtt = &cobra.Command{}
47+
amqp091_stomp = &cobra.Command{}
48+
versionCmd = &cobra.Command{}
4549
)
4650

4751
var (
@@ -206,6 +210,43 @@ func RootCmd() *cobra.Command {
206210
}
207211
mqtt_stomp.Flags().AddFlagSet(mqttPublisherFlags)
208212

213+
amqp091_amqp091 = &cobra.Command{
214+
Use: "amqp091-amqp091",
215+
Aliases: []string{"amqp091"},
216+
Run: func(cmd *cobra.Command, args []string) {
217+
cfg.PublisherProto = config.AMQP091
218+
cfg.ConsumerProto = config.AMQP091
219+
start(cfg)
220+
},
221+
}
222+
223+
amqp091_amqp = &cobra.Command{
224+
Use: "amqp091-amqp",
225+
Run: func(cmd *cobra.Command, args []string) {
226+
cfg.PublisherProto = config.AMQP091
227+
cfg.ConsumerProto = config.AMQP
228+
start(cfg)
229+
},
230+
}
231+
232+
amqp091_mqtt = &cobra.Command{
233+
Use: "amqp091-mqtt",
234+
Run: func(cmd *cobra.Command, args []string) {
235+
cfg.PublisherProto = config.AMQP091
236+
cfg.ConsumerProto = config.MQTT
237+
start(cfg)
238+
},
239+
}
240+
241+
amqp091_stomp = &cobra.Command{
242+
Use: "amqp091-stomp",
243+
Run: func(cmd *cobra.Command, args []string) {
244+
cfg.PublisherProto = config.AMQP091
245+
cfg.ConsumerProto = config.STOMP
246+
start(cfg)
247+
},
248+
}
249+
209250
versionCmd = &cobra.Command{
210251
Use: "version",
211252
Run: func(cmd *cobra.Command, args []string) {
@@ -320,6 +361,10 @@ func RootCmd() *cobra.Command {
320361
rootCmd.AddCommand(mqtt_mqtt)
321362
rootCmd.AddCommand(mqtt_amqp)
322363
rootCmd.AddCommand(mqtt_stomp)
364+
rootCmd.AddCommand(amqp091_amqp091)
365+
rootCmd.AddCommand(amqp091_amqp)
366+
rootCmd.AddCommand(amqp091_mqtt)
367+
rootCmd.AddCommand(amqp091_stomp)
323368
rootCmd.AddCommand(versionCmd)
324369

325370
return rootCmd
@@ -502,7 +547,7 @@ func startPublishers(ctx context.Context, wg *sync.WaitGroup, startPublishing ch
502547
// then we close the channel to allow all of them to start publishing "at once"
503548
// but each publisher sleeps for a random sub-second time, to avoid burts of
504549
// messages being published
505-
p.Start(ctx, publisherReady, startPublishing)
550+
p.Start(publisherReady, startPublishing)
506551
}()
507552
<-publisherReady
508553
}
@@ -552,6 +597,8 @@ func defaultUri(proto string) string {
552597
switch proto {
553598
case "amqp":
554599
uri = "amqp://localhost/"
600+
case "amqp091":
601+
uri = "amqp://localhost/"
555602
case "stomp":
556603
uri = "stomp://localhost:61613"
557604
case "mqtt":

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ require (
5151
github.com/nxadm/tail v1.4.11 // indirect
5252
github.com/onsi/ginkgo v1.16.5 // indirect
5353
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
54+
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
5455
github.com/rivo/uniseg v0.4.7 // indirect
5556
github.com/rogpeppe/go-internal v1.12.0 // indirect
5657
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b
195195
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
196196
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
197197
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
198+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
199+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
198200
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123 h1:vwJUkrY81ekNGCh2xN+ii16ZSRfsmClJkJhHGimiocA=
199201
github.com/rabbitmq/rabbitmq-amqp-go-client v0.0.0-20241121093408-1a6679a20123/go.mod h1:Km231GyOZAw9I3SZIqkfB9VVzCsu8jvFWYdghmnwueM=
200202
github.com/relvacode/iso8601 v1.6.0 h1:eFXUhMJN3Gz8Rcq82f9DTMW0svjtAVuIEULglM7QHTU=

main_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ var _ = Describe("OMQ CLI", func() {
108108
Eventually(session.Err).Should(gbytes.Say(`TOTAL CONSUMED messages=1`))
109109
Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="normal"} 1`))
110110
},
111-
Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"), // https://github.com/Azure/go-amqp/issues/313
111+
Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"),
112112
Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"),
113113
Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"),
114114
Entry("amqp -> stomp", "amqp", "/exchanges/amq.topic/", "stomp", "/topic/"),
@@ -117,6 +117,9 @@ var _ = Describe("OMQ CLI", func() {
117117
Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"),
118118
Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"),
119119
Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"),
120+
Entry("amqp091 -> amqp", "amqp091", "/queues/", "amqp", "/queues/"),
121+
Entry("amqp091 -> mqtt", "amqp091", "/exchanges/amq.topic/", "mqtt", "/topic/"),
122+
Entry("amqp091 -> stomp", "amqp091", "/exchanges/amq.topic/", "stomp", "/topic/"),
120123
)
121124

122125
DescribeTable("supports message priorities for AMQP and STOMP",

0 commit comments

Comments
 (0)