Skip to content

Issue with Flowable.zip of shared sources subscribed in the background thread #7835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
fplein opened this issue Feb 14, 2025 · 7 comments
Closed

Comments

@fplein
Copy link

fplein commented Feb 14, 2025

Running the following test, I would expect all assertions to pass

import com.google.common.base.Preconditions;
import com.google.common.truth.Truth;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;

public class ZipSharedTest {


  @Test
  public void testConcurrentZip() {
    int elementCount = 10;
    int sourceCopies = 10000;
    Flowable<Integer> source = Flowable.fromArray(IntStream.range(0, elementCount).boxed().toArray(Integer[]::new));

    io.reactivex.rxjava3.functions.Function<Object[], List<Object>> zipper = objects -> {
      Preconditions.checkArgument(Arrays.stream(objects).allMatch(o -> o.equals(objects[0])), "Elements should all be equal");
      return List.of(objects);
    };

    // A: independent sources, sequential
    {
      Iterable<List<Object>> results = Flowable.zip(Collections.nCopies(sourceCopies, source), zipper).blockingIterable();
      Truth.assertThat(results).hasSize(elementCount);
    }

    // B: multicast sources (c.f. share operator), sequential
    {
      Iterable<List<Object>> results = Flowable.zip(Collections.nCopies(sourceCopies, source.share()), zipper)
          .blockingIterable();
      Truth.assertThat(results).hasSize(elementCount);
    }

    // C: independent sources, parallel
    {
      Iterable<List<Object>> results = Flowable.zip(Collections.nCopies(sourceCopies, source.subscribeOn(Schedulers.io())), zipper)
          .blockingIterable();
      Truth.assertThat(results).hasSize(elementCount);
    }

    // D: multicast sources (c.f. share operator), parallel
    {
      Iterable<List<Object>> results = Flowable.zip(Collections.nCopies(sourceCopies, source.share().subscribeOn(Schedulers.io())), zipper)
          .blockingIterable();
      Truth.assertThat(results).hasSize(elementCount);
    }
  }
}

However, variant D fails either by throwing an exception in the zipper or without any elements. Looking into the implementation of FlowableZip, I would assume this is a race-condition in the loop of subscribing the inner ZipSubscriber. As soon as the first establishes a subscription, it requests data from the shared upstream, which then in turn makes all other ZipSubscriber miss out on some/all upstream elements.

@akarnokd
Copy link
Member

Yes, the synchronous nature of the source causes the sequence to get relalized and thus the other subscribers will non-deterministically see possibly completed sequence, thus no full row of items to zip.

@fplein
Copy link
Author

fplein commented Feb 14, 2025

Does this mean that this behaviour is expected?

Our use-case is of course not as trivial as the example I have provided, but our data goes through multiple steps where similar to the above multicasting is happening (each step mapping data to some transformation/computation). We then zip the results back together to get the full list of computations. Since streaming the source data and doing the computations is potentially expensive, we want to keep the multicast. Do you have any suggestions on how to get variant D of the OP to work, so that we do not miss any elements in the zipping? Thinking of only requesting from zip sources, once all subscribers have there subscription established?

@akarnokd
Copy link
Member

If you know the number of consumers (n) of the shared source, you can use publish().autoConnect(n). If you can line up any number of consumers on a publish()-ed source, call ConnectableFlowable::connect when you are ready to run everyone.

Otherwise, it may not work and you'd have to rethink your dataflow.

@fplein
Copy link
Author

fplein commented Feb 14, 2025

So I cannot determine the number of consumers beforehand. I did though always use a manual connect logic. I used some hacky transformer to automatically connect the sources before a blockingSubscribe. Basically I transform the upstream of the blocking subscribe using the following:

private static class ConnectingFlowable<T, Connectable> extends Flowable<T> {

    @Nonnull
    private final Flowable<T> source;
    @Nonnull
    private final Connectable connectable;
    @Nonnull
    private final java.util.function.Consumer<Connectable> connector;

    @Override
    protected void subscribeActual(@NonNull Subscriber<? super T> subscriber) {
      source.subscribe(subscriber);
      // important to call AFTER the subscription chain, since this ensures that the full flowable assembly is subscribed before emitting data
      connector.accept(connectable);
    }
  }

However, my comment on the connecting still does not hold. This still connects sources too early and is flaky. Any advise on how I could detect that everything is properly subscribed before calling the connect?

@akarnokd
Copy link
Member

Avoid using subscribeOn. Move the processing away from the emitting thread via observeOn.

@fplein
Copy link
Author

fplein commented Feb 14, 2025

Would the observeOn then still achieve some form of "concurrent" zip? I am looking to have zip behave similar to merge where sources can emit in parallel, rather than buffering them sequentially.

@akarnokd
Copy link
Member

Yes.

@fplein fplein closed this as completed Feb 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants