Skip to content

Commit d4caa87

Browse files
authored
AMQP-0.9.1 consumer (#57)
1 parent e8429a4 commit d4caa87

File tree

10 files changed

+334
-47
lines changed

10 files changed

+334
-47
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
## omq
22

3-
`omq` is a messaging system client for testing purposes. It currently supports AMQP-1.0, STOMP and MQTT 3.1/3.1.1/5.0
4-
and partially AMQP 0.9.1 (only for publishing). It is developed mostly for RabbitMQ but might be useful for other brokers
3+
`omq` is a messaging system client for testing purposes. It currently supports AMQP 1.0, AMQP 0.9.1, STOMP and MQTT 3.1/3.1.1/5.0. It is developed mostly for RabbitMQ but might be useful for other brokers
54
as well (some tests against ActiveMQ were performed).
65

76
`omq` starts a group of publishers and a group of consumers, in both cases all publishers/consumers are identical,

cmd/root.go

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,16 @@ import (
3333
var cfg config.Config
3434
var (
3535
amqp_amqp = &cobra.Command{}
36+
amqp_amqp091 = &cobra.Command{}
3637
amqp_stomp = &cobra.Command{}
3738
amqp_mqtt = &cobra.Command{}
3839
stomp_stomp = &cobra.Command{}
3940
stomp_amqp = &cobra.Command{}
41+
stomp_amqp091 = &cobra.Command{}
4042
stomp_mqtt = &cobra.Command{}
4143
mqtt_mqtt = &cobra.Command{}
4244
mqtt_amqp = &cobra.Command{}
45+
mqtt_amqp091 = &cobra.Command{}
4346
mqtt_stomp = &cobra.Command{}
4447
amqp091_amqp091 = &cobra.Command{}
4548
amqp091_amqp = &cobra.Command{}
@@ -53,6 +56,7 @@ var (
5356
amqpAppProperties []string
5457
amqpAppPropertyFilters []string
5558
amqpPropertyFilters []string
59+
streamOffset string
5660
)
5761

5862
var (
@@ -114,6 +118,9 @@ func RootCmd() *cobra.Command {
114118
amqpConsumerFlags.StringArrayVar(&amqpPropertyFilters, "amqp-property-filter", []string{},
115119
"AMQP property filters, eg. key1=&p:prefix")
116120

121+
amqp091PublisherFlags := pflag.NewFlagSet("amqp091-publisher", pflag.ContinueOnError)
122+
amqp091ConsumerFlags := pflag.NewFlagSet("amqp091-consumer", pflag.ContinueOnError)
123+
117124
amqp_amqp = &cobra.Command{
118125
Use: "amqp-amqp",
119126
Aliases: []string{"amqp"},
@@ -126,6 +133,17 @@ func RootCmd() *cobra.Command {
126133
amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags)
127134
amqp_amqp.Flags().AddFlagSet(amqpConsumerFlags)
128135

136+
amqp_amqp091 = &cobra.Command{
137+
Use: "amqp-amqp091",
138+
Run: func(cmd *cobra.Command, args []string) {
139+
cfg.PublisherProto = config.AMQP
140+
cfg.ConsumerProto = config.AMQP091
141+
start(cfg)
142+
},
143+
}
144+
amqp_amqp.Flags().AddFlagSet(amqpPublisherFlags)
145+
amqp_amqp.Flags().AddFlagSet(amqp091ConsumerFlags)
146+
129147
amqp_stomp = &cobra.Command{
130148
Use: "amqp-stomp",
131149
Run: func(cmd *cobra.Command, args []string) {
@@ -167,6 +185,16 @@ func RootCmd() *cobra.Command {
167185
}
168186
stomp_amqp.Flags().AddFlagSet(amqpConsumerFlags)
169187

188+
stomp_amqp091 = &cobra.Command{
189+
Use: "stomp-amqp091",
190+
Run: func(cmd *cobra.Command, args []string) {
191+
cfg.PublisherProto = config.STOMP
192+
cfg.ConsumerProto = config.AMQP091
193+
start(cfg)
194+
},
195+
}
196+
stomp_amqp.Flags().AddFlagSet(amqp091ConsumerFlags)
197+
170198
stomp_mqtt = &cobra.Command{
171199
Use: "stomp-mqtt",
172200
Run: func(cmd *cobra.Command, args []string) {
@@ -200,6 +228,17 @@ func RootCmd() *cobra.Command {
200228
mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags)
201229
mqtt_amqp.Flags().AddFlagSet(amqpConsumerFlags)
202230

231+
mqtt_amqp091 = &cobra.Command{
232+
Use: "mqtt-amqp091",
233+
Run: func(cmd *cobra.Command, args []string) {
234+
cfg.PublisherProto = config.MQTT
235+
cfg.ConsumerProto = config.AMQP091
236+
start(cfg)
237+
},
238+
}
239+
mqtt_amqp.Flags().AddFlagSet(mqttPublisherFlags)
240+
mqtt_amqp.Flags().AddFlagSet(amqp091ConsumerFlags)
241+
203242
mqtt_stomp = &cobra.Command{
204243
Use: "mqtt-stomp",
205244
Run: func(cmd *cobra.Command, args []string) {
@@ -219,6 +258,8 @@ func RootCmd() *cobra.Command {
219258
start(cfg)
220259
},
221260
}
261+
amqp091_amqp091.Flags().AddFlagSet(amqp091PublisherFlags)
262+
amqp091_amqp091.Flags().AddFlagSet(amqp091ConsumerFlags)
222263

223264
amqp091_amqp = &cobra.Command{
224265
Use: "amqp091-amqp",
@@ -228,6 +269,8 @@ func RootCmd() *cobra.Command {
228269
start(cfg)
229270
},
230271
}
272+
amqp091_amqp.Flags().AddFlagSet(amqp091PublisherFlags)
273+
amqp091_amqp.Flags().AddFlagSet(amqpConsumerFlags)
231274

232275
amqp091_mqtt = &cobra.Command{
233276
Use: "amqp091-mqtt",
@@ -237,6 +280,7 @@ func RootCmd() *cobra.Command {
237280
start(cfg)
238281
},
239282
}
283+
amqp091_mqtt.Flags().AddFlagSet(amqp091PublisherFlags)
240284

241285
amqp091_stomp = &cobra.Command{
242286
Use: "amqp091-stomp",
@@ -246,6 +290,7 @@ func RootCmd() *cobra.Command {
246290
start(cfg)
247291
},
248292
}
293+
amqp091_stomp.Flags().AddFlagSet(amqp091PublisherFlags)
249294

250295
versionCmd = &cobra.Command{
251296
Use: "version",
@@ -304,7 +349,7 @@ func RootCmd() *cobra.Command {
304349
"The queue/topic/terminus to consume from (%d will be replaced with the consumer's id)")
305350
rootCmd.PersistentFlags().StringVar(&cfg.ConsumerId, "consumer-id", "omq-consumer-%d",
306351
"Client ID for AMQP and MQTT consumers (%d => consumer's id, %r => random)")
307-
rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "",
352+
rootCmd.PersistentFlags().StringVar(&streamOffset, "stream-offset", "",
308353
"Stream consumer offset specification (default=next)")
309354
rootCmd.PersistentFlags().Int32Var(&cfg.ConsumerPriority, "consumer-priority", 0, "Consumer priority")
310355
rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1,
@@ -353,13 +398,16 @@ func RootCmd() *cobra.Command {
353398
"The DNS name that will return members to synchronize with")
354399

355400
rootCmd.AddCommand(amqp_amqp)
401+
rootCmd.AddCommand(amqp_amqp091)
356402
rootCmd.AddCommand(amqp_stomp)
357403
rootCmd.AddCommand(amqp_mqtt)
358404
rootCmd.AddCommand(stomp_stomp)
359405
rootCmd.AddCommand(stomp_amqp)
406+
rootCmd.AddCommand(stomp_amqp091)
360407
rootCmd.AddCommand(stomp_mqtt)
361408
rootCmd.AddCommand(mqtt_mqtt)
362409
rootCmd.AddCommand(mqtt_amqp)
410+
rootCmd.AddCommand(mqtt_amqp091)
363411
rootCmd.AddCommand(mqtt_stomp)
364412
rootCmd.AddCommand(amqp091_amqp091)
365413
rootCmd.AddCommand(amqp091_amqp)
@@ -645,6 +693,12 @@ func sanitizeConfig(cfg *config.Config) error {
645693
}
646694
}
647695

696+
offset, err := parseStreamOffset(streamOffset)
697+
if err != nil {
698+
return fmt.Errorf("invalid stream offset value")
699+
}
700+
cfg.StreamOffset = offset
701+
648702
// AMQP application properties
649703
cfg.Amqp.AppProperties = make(map[string][]string)
650704
for _, val := range amqpAppProperties {
@@ -688,6 +742,26 @@ func sanitizeConfig(cfg *config.Config) error {
688742
return nil
689743
}
690744

745+
func parseStreamOffset(offset string) (any, error) {
746+
switch offset {
747+
case "":
748+
return "", nil
749+
case "next", "first", "last":
750+
return offset, nil
751+
default:
752+
// check if streamOffset can be parsed as unsigned integer (chunkID)
753+
if chunkID, err := strconv.ParseInt(offset, 10, 64); err == nil {
754+
return chunkID, nil
755+
}
756+
// check if streamOffset can be parsed as an ISO 8601 timestamp
757+
if timestamp, err := time.Parse(time.RFC3339, offset); err == nil {
758+
return timestamp, nil
759+
}
760+
}
761+
// return "", fmt.Errorf("invalid stream offset: %s", offset)
762+
return offset, nil //, fmt.Errorf("invalid stream offset: %s", offset)
763+
}
764+
691765
func handleInterupt(ctx context.Context, cancel context.CancelFunc) {
692766
go func() {
693767
c := make(chan os.Signal, 1)

main_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,17 +109,21 @@ var _ = Describe("OMQ CLI", func() {
109109
Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="normal"} 1`))
110110
},
111111
Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"),
112-
Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"),
113-
Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"),
112+
Entry("amqp -> amqp091", "amqp", "/queues/", "amqp", "/queues/"),
114113
Entry("amqp -> stomp", "amqp", "/exchanges/amq.topic/", "stomp", "/topic/"),
115114
Entry("amqp -> mqtt", "amqp", "/exchanges/amq.topic/", "mqtt", "/topic/"),
116-
Entry("stomp -> stomp", "stomp", "/topic/", "stomp", "/topic/"),
117-
Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"),
118-
Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"),
119-
Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"),
120115
Entry("amqp091 -> amqp", "amqp091", "/queues/", "amqp", "/queues/"),
116+
Entry("amqp091 -> amqp091", "amqp091", "/queues/", "amqp", "/queues/"),
121117
Entry("amqp091 -> mqtt", "amqp091", "/exchanges/amq.topic/", "mqtt", "/topic/"),
122118
Entry("amqp091 -> stomp", "amqp091", "/exchanges/amq.topic/", "stomp", "/topic/"),
119+
Entry("mqtt -> amqp", "mqtt", "/topic/", "amqp", "/queues/"),
120+
Entry("mqtt -> amqp091", "mqtt", "/topic/", "amqp", "/queues/"),
121+
Entry("mqtt -> mqtt", "mqtt", "/topic/", "mqtt", "/topic/"),
122+
Entry("mqtt -> stomp", "mqtt", "/topic/", "stomp", "/topic/"),
123+
Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"),
124+
Entry("stomp -> amqp091", "stomp", "/topic/", "amqp", "/queues/"),
125+
Entry("stomp -> stomp", "stomp", "/topic/", "stomp", "/topic/"),
126+
Entry("stomp -> mqtt", "stomp", "/topic/", "mqtt", "/topic/"),
123127
)
124128

125129
DescribeTable("supports message priorities for AMQP and STOMP",

0 commit comments

Comments
 (0)