-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Issue with Flowable.zip of shared sources subscribed in the background thread #7835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Yes, the synchronous nature of the source causes the sequence to get relalized and thus the other subscribers will non-deterministically see possibly completed sequence, thus no full row of items to zip. |
Does this mean that this behaviour is expected? Our use-case is of course not as trivial as the example I have provided, but our data goes through multiple steps where similar to the above multicasting is happening (each step mapping data to some transformation/computation). We then zip the results back together to get the full list of computations. Since streaming the source data and doing the computations is potentially expensive, we want to keep the multicast. Do you have any suggestions on how to get variant D of the OP to work, so that we do not miss any elements in the zipping? Thinking of only requesting from zip sources, once all subscribers have there subscription established? |
If you know the number of consumers (n) of the shared source, you can use Otherwise, it may not work and you'd have to rethink your dataflow. |
So I cannot determine the number of consumers beforehand. I did though always use a manual connect logic. I used some hacky transformer to automatically connect the sources before a
However, my comment on the connecting still does not hold. This still connects sources too early and is flaky. Any advise on how I could detect that everything is properly subscribed before calling the connect? |
Avoid using |
Would the |
Yes. |
Running the following test, I would expect all assertions to pass
However, variant D fails either by throwing an exception in the
zipper
or without any elements. Looking into the implementation ofFlowableZip
, I would assume this is a race-condition in the loop of subscribing the innerZipSubscriber
. As soon as the first establishes a subscription, it requests data from the shared upstream, which then in turn makes all otherZipSubscriber
miss out on some/all upstream elements.The text was updated successfully, but these errors were encountered: