Replies: 2 comments 4 replies
-
|
Hi @bmmptlgc, var predicateHeader = (Headers headers, string headerKey, string headerValue) =>
{
var messageTypeHeader = headers.FirstOrDefault(h => h.Key == headerKey)?.GetValueBytes();
if (messageTypeHeader == null)
return false;
var messageType = Encoding.UTF8.GetString(messageTypeHeader);
return messageType.Equals(headerValue);
};
var sourceStream = builder.Stream<string, GenericRecord>("input");
var branches = sourceStream
.Branch(
(k,v) => predicateHeader(StreamizMetadata.GetCurrentHeadersMetadata(), "message-type", "orders"),
(k,v) => predicateHeader(StreamizMetadata.GetCurrentHeadersMetadata(), "message-type", "sales"),
(k,v) => predicateHeader(StreamizMetadata.GetCurrentHeadersMetadata(), "message-type", "test"),
(s, record) => true);
// only messages with the header message-type = orders
branches[0].To(...);
// only messages with the header message-type = sales
branches[1].To(...);
// only messages with the header message-type = test
branches[2].To(...);
// dlq
branches[3].To("dlq-topic");Btw, I plan to add the context in the filter and branch DSL processors in a short term |
Beta Was this translation helpful? Give feedback.
3 replies
-
|
Hi @LGouellec, |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment

Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I am new to Streamiz.Kafka.Net and I am trying to write a streaming application the will consume messages from a multi Avro schema Kafka topic, filter messages on a message type header and then publish the non filtered messages to a topic dedicated to a single message type.
For example, lets imagine source-topic has message types A, B and C (header message-type will contain the value). Considering that we want to filter out messages of type B, the end result would be all messages with message-type A being published to destination-topic-A and all messages with message-type C being published to destination-topic-C.
Something like that wouldn't work, because Filter's Func parameter doesn't have a context parameter. Is there another way to achieve my goal?
Beta Was this translation helpful? Give feedback.
All reactions