Skip to content

Commit 0668d04

Browse files
authored
3.x: Fix Flowable.concatMap backpressure w/ scalars (#7089)
1 parent 88697a4 commit 0668d04

File tree

4 files changed

+116
-14
lines changed

4 files changed

+116
-14
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
package io.reactivex.rxjava3.internal.operators.flowable;
1414

1515
import java.util.Objects;
16-
import java.util.concurrent.atomic.AtomicInteger;
16+
import java.util.concurrent.atomic.*;
1717

1818
import org.reactivestreams.*;
1919

@@ -308,7 +308,7 @@ void drain() {
308308
continue;
309309
} else {
310310
active = true;
311-
inner.setSubscription(new WeakScalarSubscription<>(vr, inner));
311+
inner.setSubscription(new SimpleScalarSubscription<>(vr, inner));
312312
}
313313

314314
} else {
@@ -325,20 +325,22 @@ void drain() {
325325
}
326326
}
327327

328-
static final class WeakScalarSubscription<T> implements Subscription {
328+
static final class SimpleScalarSubscription<T>
329+
extends AtomicBoolean
330+
implements Subscription {
331+
private static final long serialVersionUID = -7606889335172043256L;
332+
329333
final Subscriber<? super T> downstream;
330334
final T value;
331-
boolean once;
332335

333-
WeakScalarSubscription(T value, Subscriber<? super T> downstream) {
336+
SimpleScalarSubscription(T value, Subscriber<? super T> downstream) {
334337
this.value = value;
335338
this.downstream = downstream;
336339
}
337340

338341
@Override
339342
public void request(long n) {
340-
if (n > 0 && !once) {
341-
once = true;
343+
if (n > 0L && compareAndSet(false, true)) {
342344
Subscriber<? super T> a = downstream;
343345
a.onNext(value);
344346
a.onComplete();
@@ -507,7 +509,7 @@ void drain() {
507509
continue;
508510
} else {
509511
active = true;
510-
inner.setSubscription(new WeakScalarSubscription<>(vr, inner));
512+
inner.setSubscription(new SimpleScalarSubscription<>(vr, inner));
511513
}
512514
} else {
513515
active = true;

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ public void run() {
345345
continue;
346346
} else {
347347
active = true;
348-
inner.setSubscription(new WeakScalarSubscription<>(vr, inner));
348+
inner.setSubscription(new SimpleScalarSubscription<>(vr, inner));
349349
}
350350

351351
} else {
@@ -528,7 +528,7 @@ public void run() {
528528
continue;
529529
} else {
530530
active = true;
531-
inner.setSubscription(new WeakScalarSubscription<>(vr, inner));
531+
inner.setSubscription(new SimpleScalarSubscription<>(vr, inner));
532532
}
533533
} else {
534534
active = true;

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,56 @@ public Publisher<? extends Object> apply(String v)
6666
.assertResult("RxSingleScheduler");
6767
}
6868

69+
@Test
70+
public void innerScalarRequestRace() {
71+
Flowable<Integer> just = Flowable.just(1);
72+
int n = 1000;
73+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
74+
PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
75+
76+
TestSubscriber<Integer> ts = source
77+
.concatMap(v -> v, n + 1, ImmediateThinScheduler.INSTANCE)
78+
.test(1L);
79+
80+
TestHelper.race(() -> {
81+
for (int j = 0; j < n; j++) {
82+
source.onNext(just);
83+
}
84+
}, () -> {
85+
for (int j = 0; j < n; j++) {
86+
ts.request(1);
87+
}
88+
});
89+
90+
ts.assertValueCount(n);
91+
}
92+
}
93+
94+
@Test
95+
public void innerScalarRequestRaceDelayError() {
96+
Flowable<Integer> just = Flowable.just(1);
97+
int n = 1000;
98+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
99+
PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
100+
101+
TestSubscriber<Integer> ts = source
102+
.concatMapDelayError(v -> v, true, n + 1, ImmediateThinScheduler.INSTANCE)
103+
.test(1L);
104+
105+
TestHelper.race(() -> {
106+
for (int j = 0; j < n; j++) {
107+
source.onNext(just);
108+
}
109+
}, () -> {
110+
for (int j = 0; j < n; j++) {
111+
ts.request(1);
112+
}
113+
});
114+
115+
ts.assertValueCount(n);
116+
}
117+
}
118+
69119
@Test
70120
public void boundaryFusionDelayError() {
71121
Flowable.range(1, 10000)

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapTest.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@
2424
import io.reactivex.rxjava3.core.*;
2525
import io.reactivex.rxjava3.exceptions.*;
2626
import io.reactivex.rxjava3.functions.*;
27-
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription;
28-
import io.reactivex.rxjava3.processors.UnicastProcessor;
27+
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.SimpleScalarSubscription;
28+
import io.reactivex.rxjava3.processors.*;
2929
import io.reactivex.rxjava3.schedulers.Schedulers;
3030
import io.reactivex.rxjava3.subscribers.TestSubscriber;
3131
import io.reactivex.rxjava3.testsupport.TestHelper;
3232

3333
public class FlowableConcatMapTest extends RxJavaTest {
3434

3535
@Test
36-
public void weakSubscriptionRequest() {
36+
public void simpleSubscriptionRequest() {
3737
TestSubscriber<Integer> ts = new TestSubscriber<>(0);
38-
WeakScalarSubscription<Integer> ws = new WeakScalarSubscription<>(1, ts);
38+
SimpleScalarSubscription<Integer> ws = new SimpleScalarSubscription<>(1, ts);
3939
ts.onSubscribe(ws);
4040

4141
ws.request(0);
@@ -79,6 +79,56 @@ public Publisher<? extends Object> apply(String v)
7979
.assertResult("RxSingleScheduler");
8080
}
8181

82+
@Test
83+
public void innerScalarRequestRace() {
84+
Flowable<Integer> just = Flowable.just(1);
85+
int n = 1000;
86+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
87+
PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
88+
89+
TestSubscriber<Integer> ts = source
90+
.concatMap(v -> v, n + 1)
91+
.test(1L);
92+
93+
TestHelper.race(() -> {
94+
for (int j = 0; j < n; j++) {
95+
source.onNext(just);
96+
}
97+
}, () -> {
98+
for (int j = 0; j < n; j++) {
99+
ts.request(1);
100+
}
101+
});
102+
103+
ts.assertValueCount(n);
104+
}
105+
}
106+
107+
@Test
108+
public void innerScalarRequestRaceDelayError() {
109+
Flowable<Integer> just = Flowable.just(1);
110+
int n = 1000;
111+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
112+
PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
113+
114+
TestSubscriber<Integer> ts = source
115+
.concatMapDelayError(v -> v, true, n + 1)
116+
.test(1L);
117+
118+
TestHelper.race(() -> {
119+
for (int j = 0; j < n; j++) {
120+
source.onNext(just);
121+
}
122+
}, () -> {
123+
for (int j = 0; j < n; j++) {
124+
ts.request(1);
125+
}
126+
});
127+
128+
ts.assertValueCount(n);
129+
}
130+
}
131+
82132
@Test
83133
public void boundaryFusionDelayError() {
84134
Flowable.range(1, 10000)

0 commit comments

Comments
 (0)