Skip to content

Commit b665342

Browse files
authored
Merge branch 'develop-v1' into improve-docs-configuration
2 parents b0d3a47 + 83b5fc8 commit b665342

File tree

1 file changed

+152
-10
lines changed

1 file changed

+152
-10
lines changed

Documentation/integration/rabbitmq.md

Lines changed: 152 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,161 @@ parent: Integration
55
nav_order: 2
66
---
77

8-
RabbitMQ
9-
========
8+
# RabbitMQ
109

11-
To setup EventFlow's [RabbitMQ](https://www.rabbitmq.com/) integration, install the NuGet package
12-
`EventFlow.RabbitMQ` and add this to your EventFlow setup.
10+
EventFlow ships with a RabbitMQ integration that fans every persisted domain event out to an exchange. This is
11+
useful when downstream systems (read models, legacy services, analytics pipelines, and so on) must react to
12+
aggregate changes without being tightly coupled to the write model.
13+
14+
The integration focuses on **publishing**. It does not create queues or start consumers for you—topology remains
15+
an infrastructure concern so you can keep the messaging contract explicit.
16+
17+
## Prerequisites
18+
19+
- RabbitMQ 3.8 or newer (older versions work, but automatic recovery and federation are more reliable on ≥3.8).
20+
- The [`EventFlow.RabbitMQ`](https://www.nuget.org/packages/EventFlow.RabbitMQ) package alongside the core EventFlow packages.
21+
- A pre-provisioned exchange (typically a durable topic exchange) plus the queues/bindings you want to consume from.
22+
EventFlow does **not** declare exchanges or queues automatically.
23+
24+
## Install and register the publisher
25+
26+
```bash
27+
dotnet add package EventFlow.RabbitMQ
28+
```
29+
30+
Enable the publisher when you build your `EventFlowOptions`.
1331

1432
```csharp
15-
var uri = new Uri("amqp://localhost");
16-
// ...
17-
.PublishToRabbitMq(RabbitMqConfiguration.With(uri))
18-
// ...
33+
using EventFlow.RabbitMQ;
34+
using EventFlow.RabbitMQ.Extensions;
35+
36+
var rabbitUri = new Uri("amqp://guest:guest@localhost:5672/");
37+
38+
services.AddEventFlow(options => options
39+
// ... register aggregates, commands, read models, etc.
40+
.PublishToRabbitMq(
41+
RabbitMqConfiguration.With(
42+
rabbitUri,
43+
persistent: true, // mark messages as durable
44+
modelsPrConnection: 5, // pooled channels per connection
45+
exchange: "eventflow"))); // topic exchange to publish to
1946
```
2047

21-
After setting up RabbitMQ support in EventFlow, you can continue to configure it.
48+
`RabbitMqConfiguration.With` exposes the following knobs:
49+
50+
- `uri` – The AMQP URI, including credentials and vhost. Use `amqps://` when TLS is required.
51+
- `persistent` – Whether RabbitMQ should persist messages to disk (`true` by default). Set this to `false` for
52+
transient data.
53+
- `modelsPrConnection` – How many channels (models) the integration pools per connection. Increase the value if you
54+
have a high write rate and observe channel contention.
55+
- `exchange` – Name of the exchange EventFlow publishes to. The exchange must already exist.
56+
57+
Once configured, EventFlow registers an `ISubscribeSynchronousToAll` subscriber that ships each domain event to
58+
RabbitMQ right after the event is committed to the event store. The command is considered complete only after the
59+
publish succeeds (or ultimately fails), so RabbitMQ errors surface to the caller.
60+
61+
## Exchange and routing conventions
62+
63+
By default messages are published with:
64+
65+
- **Exchange** – The value supplied via `RabbitMqConfiguration.With` (defaults to `eventflow`).
66+
- **Routing key**`eventflow.domainevent.{aggregate-name}.{event-name}.{event-version}` where each segment is
67+
slugified (lowercase, dashes for PascalCase).
68+
69+
For example, an event named `UserRegistered` version `1` from `CustomerAggregate` produces:
70+
71+
```
72+
eventflow.domainevent.customer.user-registered.1
73+
```
74+
75+
### Creating queues and bindings
76+
77+
EventFlow does not create queues. Bind your own queues to the configured exchange using the routing keys that matter
78+
to a consumer. With the default topic exchange, you can subscribe to an entire aggregate or event family:
79+
80+
- `eventflow.domainevent.customer.*` – All events from `CustomerAggregate`.
81+
- `eventflow.domainevent.*.user-registered.*` – Any `UserRegistered` event regardless of aggregate.
82+
83+
```csharp
84+
using var connection = factory.CreateConnection();
85+
using var channel = connection.CreateModel();
86+
87+
channel.ExchangeDeclare("eventflow", ExchangeType.Topic, durable: true);
88+
channel.QueueDeclare("customer-updates", durable: true, exclusive: false, autoDelete: false);
89+
channel.QueueBind("customer-updates", "eventflow", "eventflow.domainevent.customer.#");
90+
```
91+
92+
Run similar provisioning code (or infrastructure as code) before your service starts or during deployment.
93+
94+
## Message payload and headers
95+
96+
The integration serializes the aggregate event using EventFlow’s regular JSON serializer. Metadata is sent alongside
97+
the message in two places:
98+
99+
- **Body** – JSON payload with the actual event data. This is identical to what the event store persists.
100+
- **Headers** – A `Dictionary<string,string>` containing EventFlow metadata such as:
101+
- `event_name`, `event_version`
102+
- `aggregate_id`, `aggregate_name`, `aggregate_sequence_number`
103+
- `event_id`, `batch_id`, `timestamp`, `timestamp_epoch`
104+
- `source_id` when available
105+
106+
Example body:
107+
108+
```json
109+
{
110+
"UserId": "dcd3f2e1-6f9b-4fcb-8901-9a5f6f2f4c0a",
111+
"Email": "customer@example.com",
112+
"RegisteredAt": "2025-09-20T17:53:41.197842Z"
113+
}
114+
```
115+
116+
Example headers:
117+
118+
| Header | Example value |
119+
| --- | --- |
120+
| `event_name` | `user-registered` |
121+
| `event_version` | `1` |
122+
| `aggregate_name` | `customer` |
123+
| `aggregate_id` | `customer-5b0d9af0` |
124+
| `aggregate_sequence_number` | `42` |
125+
| `event_id` | `01JF2ZNKX1QZS5CJ1V6AQ13RPT` |
126+
| `timestamp` | `2025-09-20T17:53:41.2012129Z` |
127+
128+
Leverage these headers to enrich logs, implement idempotency, or drive filtering logic in consumers.
129+
130+
## Reliability characteristics
131+
132+
- **Persistent messages** – Enabled by default via `basicProperties.Persistent = true` when configured.
133+
- **Connection pooling** – A connection is opened per URI and keeps a pool of AMQP channels (models) to avoid throttling.
134+
Tune `modelsPrConnection` for your throughput profile.
135+
- **Automatic recovery** – The RabbitMQ client enables topology and automatic connection recovery so brief network blips
136+
self-heal.
137+
- **Retry strategy** – Transient `BrokerUnreachableException`, `OperationInterruptedException`, and `EndOfStreamException`
138+
are retried up to three times with a 25 ms backoff. Replace `IRabbitMqRetryStrategy` in the container if you need custom
139+
retry logic.
140+
141+
Failures that propagate after retries cause the current command to fail; the publish will be retried the next time the
142+
command is executed or when the aggregate emits subsequent events.
143+
144+
## Customizing the integration
145+
146+
- **Alternate exchange or routing key** – Replace the registered `IRabbitMqMessageFactory` with your own implementation
147+
to target different exchanges, enrich headers, or transform the payload.
148+
- **Custom publish mechanics** – Override `IRabbitMqPublisher` if you need publisher confirms, tracing, or batch semantics.
149+
- **Asynchronous publishing** – If you prefer to publish outside the command execution pipeline, register your own
150+
`ISubscribeAsynchronousToAll` implementation and publish from there instead of relying on the built-in synchronous publisher.
151+
152+
```csharp
153+
services.TryAddSingleton<IRabbitMqMessageFactory, CustomRabbitMqMessageFactory>();
154+
```
155+
156+
## Troubleshooting
157+
158+
- `NOT_FOUND - no exchange` – The exchange name does not exist. Create it manually or update the configuration.
159+
- `NO_ROUTE` warnings – Nothing is bound to the routing key. Check your queue bindings.
160+
- **Channel busy or blocked** – Increase `modelsPrConnection` or scale out publishers.
161+
- **Silent drops** – Inspect consumer acknowledgements and dead-letter queues; EventFlow only publishes and cannot observe
162+
downstream failures.
22163

23-
- [Publish all domain events](../basics/subscribers.md)
164+
For general guidance on subscribers and out-of-order delivery considerations, review the
165+
[subscribers documentation](../basics/subscribers.md).

0 commit comments

Comments
 (0)