Skip to content

Commit 26be9e5

Browse files
vaibhavtiwari33vigithyhl25
authored
feat: Mvtx short-circuiting using a bypass router struct (#3126)
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com> Signed-off-by: Vigith Maurice <vigith@gmail.com> Signed-off-by: Yashash <yashashhl25@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Yashash <yashashhl25@gmail.com>
1 parent 81fa198 commit 26be9e5

File tree

17 files changed

+1722
-589
lines changed

17 files changed

+1722
-589
lines changed

rust/numaflow-core/src/config/monovertex.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub(crate) struct MonovertexConfig {
3838
pub(crate) graceful_shutdown_time: Duration,
3939
pub(crate) replica: u16,
4040
pub(crate) source_config: SourceConfig,
41-
pub(crate) bypass_condition: Option<ToSinkCondition>,
41+
pub(crate) bypass_condition: Option<BypassConditions>,
4242
pub(crate) map_config: Option<MapVtxConfig>,
4343
pub(crate) sink_config: SinkConfig,
4444
pub(crate) transformer_config: Option<TransformerConfig>,
@@ -253,16 +253,16 @@ impl MonovertexConfig {
253253
}
254254

255255
#[derive(Debug, Clone, PartialEq)]
256-
pub(crate) struct ToSinkCondition {
256+
pub(crate) struct BypassConditions {
257257
pub(crate) sink: Option<Box<ForwardConditions>>,
258258
pub(crate) fallback: Option<Box<ForwardConditions>>,
259259
pub(crate) on_success: Option<Box<ForwardConditions>>,
260260
}
261261

262-
impl TryFrom<Box<MonoVertexBypassCondition>> for ToSinkCondition {
262+
impl TryFrom<Box<MonoVertexBypassCondition>> for BypassConditions {
263263
type Error = Error;
264264
fn try_from(mvtx_sinker_condition: Box<MonoVertexBypassCondition>) -> Result<Self> {
265-
Ok(ToSinkCondition {
265+
Ok(BypassConditions {
266266
sink: mvtx_sinker_condition.sink,
267267
fallback: mvtx_sinker_condition.fallback,
268268
on_success: mvtx_sinker_condition.on_success,

rust/numaflow-core/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ pub enum Error {
2828
#[error("Forwarder Error - {0}")]
2929
Forwarder(String),
3030

31+
#[error("Bypass Router Error - {0}")]
32+
BypassRouter(String),
33+
3134
#[error("Connection Error - {0}")]
3235
Connection(String),
3336

rust/numaflow-core/src/mapper/map.rs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub(super) mod batch;
2626
pub(super) mod stream;
2727
pub(super) mod unary;
2828

29+
use crate::monovertex::bypass_router::MvtxBypassRouter;
2930
use batch::UserDefinedBatchMap;
3031
use stream::UserDefinedStreamMap;
3132
use unary::UserDefinedUnaryMap;
@@ -253,6 +254,7 @@ impl MapHandle {
253254
mut self,
254255
input_stream: ReceiverStream<Message>,
255256
cln_token: CancellationToken,
257+
bypass_router: Option<MvtxBypassRouter>,
256258
) -> error::Result<(ReceiverStream<Message>, JoinHandle<error::Result<()>>)> {
257259
let (output_tx, output_rx) = mpsc::channel(self.batch_size);
258260
let (error_tx, mut error_rx) = mpsc::channel(self.batch_size);
@@ -322,6 +324,7 @@ impl MapHandle {
322324
self.tracker.clone(),
323325
error_tx.clone(),
324326
hard_shutdown_token.clone(),
327+
bypass_router.clone(),
325328
).await;
326329
}
327330
},
@@ -358,6 +361,7 @@ impl MapHandle {
358361
batch,
359362
output_tx.clone(),
360363
self.tracker.clone(),
364+
bypass_router.clone(),
361365
)
362366
.await
363367
{
@@ -415,6 +419,7 @@ impl MapHandle {
415419
self.tracker.clone(),
416420
error_tx,
417421
cln_token.clone(),
422+
bypass_router.clone(),
418423
).await;
419424
}
420425
},
@@ -458,6 +463,7 @@ impl MapHandle {
458463
tracker: Tracker,
459464
error_tx: mpsc::Sender<Error>,
460465
cln_token: CancellationToken,
466+
bypass_router: Option<MvtxBypassRouter>,
461467
) {
462468
let output_tx = output_tx.clone();
463469

@@ -485,10 +491,15 @@ impl MapHandle {
485491

486492
// send messages downstream
487493
for mapped_message in mapped_messages {
488-
output_tx
494+
if let Some(ref bypass_router) = bypass_router && bypass_router.try_bypass(mapped_message.clone()).await.expect("failed to send message to bypass channel")
495+
{
496+
continue
497+
} else {
498+
output_tx
489499
.send(mapped_message)
490500
.await
491501
.expect("failed to send response");
502+
}
492503
}
493504
}
494505
Ok(Err(map_err)) => {
@@ -537,6 +548,7 @@ impl MapHandle {
537548
batch: Vec<Message>,
538549
output_tx: mpsc::Sender<Message>,
539550
tracker: Tracker,
551+
bypass_router: Option<MvtxBypassRouter>,
540552
) -> error::Result<()> {
541553
let (senders, receivers): (Vec<_>, Vec<_>) =
542554
batch.iter().map(|_| oneshot::channel()).unzip();
@@ -561,11 +573,21 @@ impl MapHandle {
561573
)
562574
.await?;
563575
}
576+
564577
for mapped_message in mapped_messages {
565-
output_tx
566-
.send(mapped_message)
567-
.await
568-
.expect("failed to send response");
578+
if let Some(ref bypass_router) = bypass_router
579+
&& bypass_router
580+
.try_bypass(mapped_message.clone())
581+
.await
582+
.expect("failed to send message to bypass channel")
583+
{
584+
continue;
585+
} else {
586+
output_tx
587+
.send(mapped_message)
588+
.await
589+
.expect("failed to send response");
590+
}
569591
}
570592
}
571593
Ok(Err(_map_err)) => {
@@ -597,6 +619,7 @@ impl MapHandle {
597619
tracker: Tracker,
598620
error_tx: mpsc::Sender<Error>,
599621
cln_token: CancellationToken,
622+
bypass_router: Option<MvtxBypassRouter>,
600623
) {
601624
let output_tx = output_tx.clone();
602625
tokio::spawn(async move {
@@ -621,7 +644,13 @@ impl MapHandle {
621644
.serving_append(mapped_message.offset.clone(), mapped_message.tags.clone())
622645
.await
623646
.expect("failed to update tracker");
624-
output_tx.send(mapped_message).await.expect("failed to send response");
647+
648+
if let Some(ref bypass_router) = bypass_router && bypass_router.try_bypass(mapped_message.clone()).await.expect("failed to send message to bypass channel")
649+
{
650+
continue
651+
} else {
652+
output_tx.send(mapped_message).await.expect("failed to send response");
653+
}
625654
}
626655
Some(Err(e)) => {
627656
error!(?e, "failed to map message");
@@ -756,6 +785,7 @@ mod tests {
756785
tracker,
757786
error_tx,
758787
CancellationToken::new(),
788+
None,
759789
)
760790
.await;
761791

@@ -878,7 +908,7 @@ mod tests {
878908
drop(input_tx);
879909

880910
let (output_stream, map_handle) = mapper
881-
.streaming_map(input_stream, CancellationToken::new())
911+
.streaming_map(input_stream, CancellationToken::new(), None)
882912
.await?;
883913
let mut output_rx = output_stream.into_inner();
884914

@@ -983,7 +1013,7 @@ mod tests {
9831013
drop(input_tx);
9841014

9851015
let (mut output_stream, map_handle) = mapper
986-
.streaming_map(input_stream, CancellationToken::new())
1016+
.streaming_map(input_stream, CancellationToken::new(), None)
9871017
.await?;
9881018

9891019
let mut responses = vec![];
@@ -1057,7 +1087,7 @@ mod tests {
10571087
let input_stream = ReceiverStream::new(input_rx);
10581088
let cln_token = CancellationToken::new();
10591089
let (_output_stream, map_handle) = mapper
1060-
.streaming_map(input_stream, cln_token.clone())
1090+
.streaming_map(input_stream, cln_token.clone(), None)
10611091
.await?;
10621092
let mut ack_rxs = vec![];
10631093
// send 10 requests to the mapper
@@ -1201,7 +1231,7 @@ mod tests {
12011231
}
12021232

12031233
let (_output_stream, map_handle) = mapper
1204-
.streaming_map(input_stream, cln_token.clone())
1234+
.streaming_map(input_stream, cln_token.clone(), None)
12051235
.await?;
12061236

12071237
drop(input_tx);
@@ -1277,7 +1307,7 @@ mod tests {
12771307
let input_stream = ReceiverStream::new(input_rx);
12781308

12791309
let (_output_stream, map_handle) = mapper
1280-
.streaming_map(input_stream, cln_token.clone())
1310+
.streaming_map(input_stream, cln_token.clone(), None)
12811311
.await?;
12821312

12831313
let mut ack_rxs = vec![];

rust/numaflow-core/src/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ pub(crate) enum ReadAck {
285285
}
286286

287287
/// Message ID which is used to uniquely identify a message. It cheap to clone this.
288-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
288+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
289289
pub(crate) struct MessageID {
290290
pub(crate) vertex_name: Bytes,
291291
pub(crate) offset: Bytes,

rust/numaflow-core/src/monovertex.rs

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::config::monovertex::MonovertexConfig;
66
use crate::error::{self};
77
use crate::mapper::map::MapHandle;
88
use crate::metrics::{LagReader, PendingReaderTasks};
9+
use crate::monovertex::bypass_router::BypassRouterConfig;
910
use crate::shared::create_components;
1011
use crate::sinker::sink::SinkWriter;
1112
use crate::source::Source;
@@ -23,7 +24,8 @@ use crate::{metrics, shared};
2324
/// - Calls the Sinker to write the batch to the Sink
2425
/// - Send Acknowledgement back to the Source
2526
pub(crate) mod forwarder;
26-
mod stream_splitter;
27+
28+
pub(crate) mod bypass_router;
2729

2830
pub(crate) async fn start_forwarder(
2931
cln_token: CancellationToken,
@@ -62,6 +64,26 @@ async fn run_monovertex_forwarder<C: crate::typ::NumaflowTypeConfig>(
6264
) -> error::Result<()> {
6365
let tracker = Tracker::new(None, cln_token.clone());
6466

67+
let sink_writer = create_components::create_sink_writer(
68+
config.batch_size,
69+
config.read_timeout,
70+
config.sink_config.clone(),
71+
config.fb_sink_config.clone(),
72+
config.on_success_sink_config.clone(),
73+
None,
74+
&cln_token,
75+
config.bypass_condition.clone(),
76+
)
77+
.await?;
78+
79+
let bypass_router = config.bypass_condition.as_ref().map(|bypass_condition| {
80+
BypassRouterConfig::new(
81+
bypass_condition.clone(),
82+
config.batch_size,
83+
config.read_timeout,
84+
)
85+
});
86+
6587
let transformer = create_components::create_transformer(
6688
config.batch_size,
6789
config.graceful_shutdown_time,
@@ -98,17 +120,6 @@ async fn run_monovertex_forwarder<C: crate::typ::NumaflowTypeConfig>(
98120
None
99121
};
100122

101-
let sink_writer = create_components::create_sink_writer(
102-
config.batch_size,
103-
config.read_timeout,
104-
config.sink_config.clone(),
105-
config.fb_sink_config.clone(),
106-
config.on_success_sink_config.clone(),
107-
None,
108-
&cln_token,
109-
)
110-
.await?;
111-
112123
// Start the metrics server in a separate background async spawn,
113124
// This should be running throughout the lifetime of the application, hence the handle is not
114125
// joined.
@@ -128,7 +139,15 @@ async fn run_monovertex_forwarder<C: crate::typ::NumaflowTypeConfig>(
128139
shared::metrics::start_metrics_server::<C>(config.metrics_config.clone(), metrics_state)
129140
.await;
130141

131-
start::<C>(config.clone(), source, mapper, sink_writer, cln_token).await?;
142+
start::<C>(
143+
config.clone(),
144+
source,
145+
mapper,
146+
sink_writer,
147+
bypass_router,
148+
cln_token,
149+
)
150+
.await?;
132151

133152
// abort the metrics server
134153
metrics_server_handle.abort();
@@ -140,6 +159,7 @@ async fn start<C: crate::typ::NumaflowTypeConfig>(
140159
source: Source<C>,
141160
mapper: Option<MapHandle>,
142161
sink: SinkWriter,
162+
bypass_router_config: Option<BypassRouterConfig>,
143163
cln_token: CancellationToken,
144164
) -> error::Result<()> {
145165
// Store the pending reader handle outside, so it doesn't get dropped immediately.
@@ -157,7 +177,7 @@ async fn start<C: crate::typ::NumaflowTypeConfig>(
157177
None
158178
};
159179

160-
let forwarder = forwarder::Forwarder::<C>::new(source, mapper, sink);
180+
let forwarder = forwarder::Forwarder::<C>::new(source, mapper, sink, bypass_router_config);
161181

162182
info!("Forwarder is starting...");
163183
// start the forwarder, it will return only on Signal

0 commit comments

Comments
 (0)