-
Notifications
You must be signed in to change notification settings - Fork 523
Description
Hello,
I frequently encounter the need to write code like the following (whether using Fluvio or Kafka):
async fn process_chunk_with_shutdown<'a>(
&self,
...
) -> Result<Unit, Error> {
match chunk {
Ok(chunk) => {
tokio::select! {
_ = subsys.on_shutdown_requested() => {
// Perform shutdown-related actions here.
},
_ = self.process_chunk(ctx, *target, chunk.as_ref()) => {
...
},
}
Ok(())
}
Err(e) => ...
}
}
async fn consume_chunks<'a>(
&self,
...
) -> Result<Unit, Error> {
let mut stream = ...
stream
.try_ready_chunks(consumer_batch_size)
.map(|chunk| {
async move {
self.process_chunk_with_shutdown(chunk, ctx, target, subsys)
.await
}
})
.buffered(consumer_batch_size)
.collect::<Vec<_>>()
.await;
Ok(())
}In essence, trying to read from a stream and process its chunks. However, with Fluvio I'm currently facing a limitation: the ConsumerStream isn't clonable, so I can't wrap it in an Arc and/or pass it by reference to a StreamIterator that consumes it (such as try_ready_chunks).
As far as I can tell, the only way to consume records via the Rust client while manually committing offsets is to process them one by one [next].
Would it be reasonable to implement Clone for ConsumerStream, or perhaps provide an operator that allows batch processing? If I'm missing something here, I’d greatly appreciate clarification. I'd also be happy to contribute if there’s something I can help with!
Thanks in advance for your time and support!