A question about the headers exchange routing #13934
-
Describe the bugProblemThe header exchange does not route messages as advertised. Since criteria for routing are associated with a queue-binding, I expect only messages to be queued to this queue when the message headers match the criteria stored with the queue binding. This, however, is not the reality. When configuring an exchange with 2 bound queues, each with their own criteria, and publishing 2 messages to the exchange, each with an argument that matches one of the binding criteria, I would expect that the message that has arguments matching the binding criteria for queue one, is only routed to queue one, and that the message that matches the criteria of the binding for queue two, would be routed to queue two only. Both messages, however, are routed to both queues, comletely ignoring the criteria. Below a unit test (C#) that produces this error. `
` Reproduction steps
Expected behaviorI expect message 1 only to appear in queue-one Additional contextNo response |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
@everttimmer1963 as advertised where? I cannot reproduce against RabbitMQ 4.1.0 using the following example from one of the client library doc guides. Documentation Example#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require "bunny"
puts "=> Headers exchange routing"
puts
conn = Bunny.new
conn.start
ch = conn.create_channel
x = ch.headers("headers")
q1 = ch.queue("rabbitmq-server.13934.q1.linux", exclusive: true).bind(x, arguments: {"os" => "linux", "cores" => 8, "x-match" => "all"})
q2 = ch.queue("rabbitmq-server.13934.q2.macos", exclusive: true).bind(x, arguments: {"os" => "macos", "cores" => 4, "x-match" => "any"})
q1.subscribe do |delivery_info, properties, content|
puts "#{q1.name} received #{content}"
end
q2.subscribe do |delivery_info, properties, content|
puts "#{q2.name} received #{content}"
end
x.publish("8 cores/Linux", :headers => {"os" => "linux", "cores" => 8})
x.publish("8 cores/OS X", :headers => {"os" => "macos", "cores" => 8})
x.publish("4 cores/Linux", :headers => {"os" => "linux", "cores" => 4})
sleep 0.5
conn.close Which outputs
As is very common to see with the (rare) headers exchange questions, it likely comes down to
Existing Tests That Use the Headers ExchangeRabbitMQ has tests for the headers exchange behavior, such as %% Test that headers exchange's x-match binding argument set to all-with-x and any-with-x
%% works as expected. The use case being tested here:
%% Route dead-letter messages to different target queues
%% according to first death reason and first death queue.
dead_letter_headers_first_death_route(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName1 = ?config(queue_name, Config),
QName2 = <<"dead_letter_headers_first_death_route_source_queue_2">>,
DLXExpiredQName = ?config(queue_name_dlx, Config),
DLXRejectedQName = ?config(queue_name_dlx_2, Config),
Args = ?config(queue_args, Config),
Durable = ?config(queue_durable, Config),
DLXExchange = ?config(dlx_exchange, Config),
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange,
type = <<"headers">>}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName1,
arguments = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange} | Args],
durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName2,
arguments = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange} | Args],
durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXExpiredQName,
durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXRejectedQName,
durable = Durable}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXExpiredQName,
exchange = DLXExchange,
arguments = [{<<"x-match">>, longstr, <<"all-with-x">>},
{<<"x-first-death-reason">>, longstr, <<"expired">>},
{<<"x-first-death-queue">>, longstr, QName1}]
}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXRejectedQName,
exchange = DLXExchange,
arguments = [{<<"x-match">>, longstr, <<"any-with-x">>},
{<<"x-first-death-reason">>, longstr, <<"rejected">>}]
}),
%% Send 1st message to 1st source queue and let it expire.
P1 = <<"msg1">>,
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName1},
#amqp_msg{payload = P1,
props = #'P_basic'{expiration = <<"0">>}}),
%% The 1st message gets dead-lettered to DLXExpiredQName.
wait_for_messages(Config, [[DLXExpiredQName, <<"1">>, <<"1">>, <<"0">>]]),
_ = consume(Ch, DLXExpiredQName, [P1]),
consume_empty(Ch, DLXExpiredQName),
wait_for_messages(Config, [[QName1, <<"0">>, <<"0">>, <<"0">>]]),
%% Send 2nd message to 2nd source queue and let it expire.
P2 = <<"msg2">>,
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName2},
#amqp_msg{payload = P2,
props = #'P_basic'{expiration = <<"0">>}}),
%% Send 2nd message should not be routed by the dead letter headers exchange.
rabbit_ct_helpers:consistently(?_assertEqual(#'basic.get_empty'{},
amqp_channel:call(Ch, #'basic.get'{queue = DLXExpiredQName}))),
%% Send and reject the 3rd message.
P3 = <<"msg3">>,
publish(Ch, QName2, [P3]),
case group_name(Config) of
at_most_once ->
wait_for_messages(Config, [[QName2, <<"1">>, <<"1">>, <<"0">>]]);
at_least_once ->
wait_for_messages(Config, [[QName2, <<"2">>, <<"1">>, <<"0">>]])
end,
[DTag] = consume(Ch, QName2, [P3]),
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
requeue = false}),
%% The 3rd message gets dead-lettered to DLXRejectedQName.
wait_for_messages(Config, [[DLXRejectedQName, <<"1">>, <<"1">>, <<"0">>]]),
_ = consume(Ch, DLXRejectedQName, [P3]),
consume_empty(Ch, DLXRejectedQName),
_ = amqp_channel:call(Ch, #'queue.delete'{queue = QName2}),
ok. Or this: @Test
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_10)
public void headersWithXRouting() throws Exception {
Map<String, Object> spec = new HashMap<String, Object>();
spec.put("x-key-1", "value-1");
spec.put("x-key-2", "value-2");
spec.put("x-match", "all-with-x");
channel.queueBind(Q1, "amq.match", "", spec);
spec.put("x-match", "any-with-x");
channel.queueBind(Q2, "amq.match", "", spec);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
channel.basicPublish("amq.match", "", props.build(), "0".getBytes());
Map<String, Object> map = new HashMap<String, Object>();
props.headers(map);
map.clear();
map.put("x-key-1", "value-1");
channel.basicPublish("amq.match", "", props.build(), "1".getBytes());
map.clear();
map.put("x-key-1", "value-1");
map.put("x-key-2", "value-2");
channel.basicPublish("amq.match", "", props.build(), "2".getBytes());
map.clear();
map.put("x-key-1", "value-1");
map.put("x-key-2", "value-2");
map.put("x-key-3", "value-3");
channel.basicPublish("amq.match", "", props.build(), "3".getBytes());
checkGet(Q1, true); // 2
checkGet(Q1, true); // 3
checkGet(Q1, false);
checkGet(Q2, true); // 1
checkGet(Q2, true); // 2
checkGet(Q2, true); // 3
checkGet(Q2, false);
}
private void checkGet(String queue, boolean messageExpected)
throws IOException
{
GetResponse r = channel.basicGet(queue, true);
if (messageExpected) {
assertNotNull(r);
} else {
assertNull(r);
}
} |
Beta Was this translation helpful? Give feedback.
-
Finally, headers exchanges are very rarely used because they usually can be swapped for topic exchanges using a basic convention. "a = 1 AND b = 2" (two header conditions) can be expressed as a topic, We don't have neither RabbitMQ nor .NET client details, so I have provided two tests and one doc example that demonstrate the "selective routing" you are looking for. |
Beta Was this translation helpful? Give feedback.
-
You need to use value
|
Beta Was this translation helpful? Give feedback.
You need to use value
all-with-x
as explained in https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-headers :