Skip to content

Commit cbb95fb

Browse files
New command group: queues
1 parent 45ff427 commit cbb95fb

File tree

4 files changed

+151
-1
lines changed

4 files changed

+151
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* `global_parameters` is a new command group for operations on [global runtime parameters](https://www.rabbitmq.com/docs/parameters)
88
* `nodes` is a new command group for operations on nodes
99
* `parameters` is a new command group for operations on [runtime parameters](https://www.rabbitmq.com/docs/parameters)
10+
* `queues` is a new command group for operations on queues
1011
* `users` is a new command group for operations on users
1112
* `vhosts` is a new command group for operations on virtual hosts
1213
* Command groups are now ordered alphabetically

src/cli.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,12 @@ pub fn parser(pre_flight_settings: PreFlightSettings) -> Command {
201201
.infer_long_args(pre_flight_settings.infer_long_options)
202202
.subcommand_value_name("queue")
203203
.subcommands(purge_subcommands(pre_flight_settings.clone()));
204+
let queues_group = Command::new("queues")
205+
.about("Operations on queues")
206+
.infer_subcommands(pre_flight_settings.infer_subcommands)
207+
.infer_long_args(pre_flight_settings.infer_long_options)
208+
.subcommand_value_name("queue")
209+
.subcommands(queues_subcommands(pre_flight_settings.clone()));
204210
let rebalance_group = Command::new("rebalance")
205211
.about("Rebalancing of leader replicas")
206212
.infer_subcommands(pre_flight_settings.infer_subcommands)
@@ -275,6 +281,7 @@ pub fn parser(pre_flight_settings: PreFlightSettings) -> Command {
275281
policies_group,
276282
publish_group,
277283
purge_group,
284+
queues_group,
278285
rebalance_group,
279286
show_group,
280287
shovels_group,
@@ -1227,6 +1234,78 @@ fn purge_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 1] {
12271234
[queue_cmd].map(|cmd| cmd.infer_long_args(pre_flight_settings.infer_long_options))
12281235
}
12291236

1237+
fn queues_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 5] {
1238+
let declare_cmd = Command::new("declare")
1239+
.about("Declares a queue or a stream")
1240+
.after_help(color_print::cformat!(
1241+
"<bold>Doc guide:</bold>: {}",
1242+
QUEUE_GUIDE_URL
1243+
))
1244+
.arg(Arg::new("name").long("name").required(true).help("name"))
1245+
.arg(
1246+
Arg::new("type")
1247+
.long("type")
1248+
.help("queue type")
1249+
.value_parser(value_parser!(QueueType))
1250+
.required(false)
1251+
.default_value("classic"),
1252+
)
1253+
.arg(
1254+
Arg::new("durable")
1255+
.long("durable")
1256+
.help("should it persist after a restart")
1257+
.required(false)
1258+
.value_parser(value_parser!(bool)),
1259+
)
1260+
.arg(
1261+
Arg::new("auto_delete")
1262+
.long("auto-delete")
1263+
.help("should it be deleted when the last consumer disconnects")
1264+
.required(false)
1265+
.value_parser(value_parser!(bool)),
1266+
)
1267+
.arg(
1268+
Arg::new("arguments")
1269+
.long("arguments")
1270+
.help("additional exchange arguments")
1271+
.required(false)
1272+
.default_value("{}")
1273+
.value_parser(value_parser!(String)),
1274+
);
1275+
let idempotently_arg = Arg::new("idempotently")
1276+
.long("idempotently")
1277+
.value_parser(value_parser!(bool))
1278+
.action(ArgAction::SetTrue)
1279+
.help("do not consider 404 Not Found API responses to be errors")
1280+
.required(false);
1281+
let delete_cmd = Command::new("delete")
1282+
.about("Deletes a queue")
1283+
.arg(
1284+
Arg::new("name")
1285+
.long("name")
1286+
.help("queue name")
1287+
.required(true),
1288+
)
1289+
.arg(idempotently_arg.clone());
1290+
let list_cmd = Command::new("list")
1291+
.long_about("Lists queues and streams")
1292+
.after_help(color_print::cformat!(
1293+
"<bold>Doc guide</bold>: {}",
1294+
QUEUE_GUIDE_URL
1295+
));
1296+
let purge_cmd = Command::new("purge")
1297+
.long_about("Purges (permanently removes unacknowledged messages from) a queue")
1298+
.arg(
1299+
Arg::new("name")
1300+
.long("name")
1301+
.help("name of the queue to purge")
1302+
.required(true),
1303+
);
1304+
let rebalance_cmd = Command::new("rebalance").about("Rebalances queue leaders");
1305+
[declare_cmd, delete_cmd, list_cmd, purge_cmd, rebalance_cmd]
1306+
.map(|cmd| cmd.infer_long_args(pre_flight_settings.infer_long_options))
1307+
}
1308+
12301309
fn parameters_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 3] {
12311310
let list_cmd = Command::new("list")
12321311
.arg(

src/main.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,26 @@ fn dispatch_common_subcommand(
641641
let result = commands::purge_queue(client, &vhost, second_level_args);
642642
res_handler.no_output_on_success(result);
643643
}
644+
("queues", "declare") => {
645+
let result = commands::declare_queue(client, &vhost, second_level_args);
646+
res_handler.no_output_on_success(result);
647+
}
648+
("queues", "delete") => {
649+
let result = commands::delete_queue(client, &vhost, second_level_args);
650+
res_handler.delete_operation_result(result);
651+
}
652+
("queues", "list") => {
653+
let result = commands::list_queues(client, &vhost);
654+
res_handler.tabular_result(result)
655+
}
656+
("queues", "purge") => {
657+
let result = commands::purge_queue(client, &vhost, second_level_args);
658+
res_handler.no_output_on_success(result);
659+
}
660+
("queues", "rebalance") => {
661+
let result = commands::rebalance_queues(client);
662+
res_handler.no_output_on_success(result);
663+
}
644664
("rebalance", "queues") => {
645665
let result = commands::rebalance_queues(client);
646666
res_handler.no_output_on_success(result);

tests/queues_tests.rs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ fn list_queues() -> Result<(), Box<dyn std::error::Error>> {
4949
run_succeeds(["-V", vh1, "list", "queues"])
5050
.stdout(predicate::str::contains(q1).and(predicate::str::contains("new_queue2").not()));
5151

52-
// delete the queue in vhost 1
52+
// purge a queue in vhost 1
53+
run_succeeds(["-V", vh1, "purge", "queue", "--name", q1]);
54+
55+
// delete a queue in vhost 1
5356
run_succeeds(["-V", vh1, "delete", "queue", "--name", q1]);
5457

5558
// list queues in vhost 1
@@ -60,3 +63,50 @@ fn list_queues() -> Result<(), Box<dyn std::error::Error>> {
6063

6164
Ok(())
6265
}
66+
67+
#[test]
68+
fn queues_lists() -> Result<(), Box<dyn std::error::Error>> {
69+
let vh1 = "queue_vhost_3";
70+
let vh2 = "queue_vhost_4";
71+
let q1 = "new_queue1";
72+
let q2 = "new_queue2";
73+
74+
delete_vhost(vh1).expect("failed to delete a virtual host");
75+
delete_vhost(vh2).expect("failed to delete a virtual host");
76+
77+
// declare vhost 1
78+
run_succeeds(["vhosts", "declare", "--name", vh1]);
79+
80+
// declare vhost 2
81+
run_succeeds(["vhosts", "declare", "--name", vh2]);
82+
83+
// declare a new queue in vhost 1
84+
run_succeeds([
85+
"-V", vh1, "queues", "declare", "--name", q1, "--type", "classic",
86+
]);
87+
88+
// declare new queue in vhost 2
89+
run_succeeds([
90+
"-V", vh2, "queues", "declare", "--name", q2, "--type", "quorum",
91+
]);
92+
93+
await_queue_metric_emission();
94+
95+
// list queues in vhost 1
96+
run_succeeds(["-V", vh1, "queues", "list"])
97+
.stdout(predicate::str::contains(q1).and(predicate::str::contains("new_queue2").not()));
98+
99+
// purge a queue in vhost 1
100+
run_succeeds(["-V", vh1, "queues", "purge", "--name", q1]);
101+
102+
// delete a queue in vhost 1
103+
run_succeeds(["-V", vh1, "queues", "delete", "--name", q1]);
104+
105+
// list queues in vhost 1
106+
run_succeeds(["-V", vh1, "queues", "list"]).stdout(predicate::str::contains(q1).not());
107+
108+
delete_vhost(vh1).expect("failed to delete a virtual host");
109+
delete_vhost(vh2).expect("failed to delete a virtual host");
110+
111+
Ok(())
112+
}

0 commit comments

Comments
 (0)