Skip to content

Commit b3ad075

Browse files
authored
3.x: Add missing coverage, fix unused/inconsistent ops (#6901)
* 3.x: Add missing coverage, fix unused/inconsistent ops * More coverage improvements and cleanup * Some more coverage * Observable coverage and cleanup * Improve Flowable internals and coverage * More Flowable operator coverage and fixes * Last set of coverage & cleanup for Flowable operators * Fix wrong use of j.u.Observable
1 parent 38eda0c commit b3ad075

File tree

225 files changed

+7358
-1093
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

225 files changed

+7358
-1093
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ private static Completable merge0(@NonNull Publisher<@NonNull ? extends Completa
981981
@SafeVarargs
982982
public static Completable mergeArrayDelayError(@NonNull CompletableSource... sources) {
983983
Objects.requireNonNull(sources, "sources is null");
984-
return RxJavaPlugins.onAssembly(new CompletableMergeDelayErrorArray(sources));
984+
return RxJavaPlugins.onAssembly(new CompletableMergeArrayDelayError(sources));
985985
}
986986

987987
/**

src/main/java/io/reactivex/rxjava3/internal/functions/Functions.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,8 @@ public static <T> Predicate<T> equalsWith(T value) {
281281
return new EqualsPredicate<>(value);
282282
}
283283

284-
enum HashSetCallable implements Supplier<Set<Object>>, Callable<Set<Object>> {
284+
enum HashSetSupplier implements Supplier<Set<Object>> {
285285
INSTANCE;
286-
@Override
287-
public Set<Object> call() {
288-
return new HashSet<>();
289-
}
290-
291286
@Override
292287
public Set<Object> get() {
293288
return new HashSet<>();
@@ -296,7 +291,7 @@ public Set<Object> get() {
296291

297292
@SuppressWarnings({ "rawtypes", "unchecked" })
298293
public static <T> Supplier<Set<T>> createHashSet() {
299-
return (Supplier)HashSetCallable.INSTANCE;
294+
return (Supplier)HashSetSupplier.INSTANCE;
300295
}
301296

302297
static final class NotificationOnNext<T> implements Consumer<T> {
@@ -742,12 +737,7 @@ public boolean test(Object o) {
742737
}
743738
}
744739

745-
static final class NullProvider implements Callable<Object>, Supplier<Object> {
746-
@Override
747-
public Object call() {
748-
return null;
749-
}
750-
740+
static final class NullProvider implements Supplier<Object> {
751741
@Override
752742
public Object get() {
753743
return null;

src/main/java/io/reactivex/rxjava3/internal/observers/FutureObserver.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,22 +128,15 @@ public void onNext(T t) {
128128
@Override
129129
public void onError(Throwable t) {
130130
if (error == null) {
131-
error = t;
132-
133-
for (;;) {
134-
Disposable a = upstream.get();
135-
if (a == this || a == DisposableHelper.DISPOSED) {
136-
RxJavaPlugins.onError(t);
137-
return;
138-
}
139-
if (upstream.compareAndSet(a, this)) {
140-
countDown();
141-
return;
142-
}
131+
Disposable a = upstream.get();
132+
if (a != this && a != DisposableHelper.DISPOSED
133+
&& upstream.compareAndSet(a, this)) {
134+
error = t;
135+
countDown();
136+
return;
143137
}
144-
} else {
145-
RxJavaPlugins.onError(t);
146138
}
139+
RxJavaPlugins.onError(t);
147140
}
148141

149142
@Override
@@ -152,15 +145,12 @@ public void onComplete() {
152145
onError(new NoSuchElementException("The source is empty"));
153146
return;
154147
}
155-
for (;;) {
156-
Disposable a = upstream.get();
157-
if (a == this || a == DisposableHelper.DISPOSED) {
158-
return;
159-
}
160-
if (upstream.compareAndSet(a, this)) {
161-
countDown();
162-
return;
163-
}
148+
Disposable a = upstream.get();
149+
if (a == this || a == DisposableHelper.DISPOSED) {
150+
return;
151+
}
152+
if (upstream.compareAndSet(a, this)) {
153+
countDown();
164154
}
165155
}
166156

src/main/java/io/reactivex/rxjava3/internal/observers/InnerQueuedObserver.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,4 @@ public void setDone() {
114114
public SimpleQueue<T> queue() {
115115
return queue;
116116
}
117-
118-
public int fusionMode() {
119-
return fusionMode;
120-
}
121117
}

src/main/java/io/reactivex/rxjava3/internal/observers/QueueDrainObserver.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ public final boolean enter() {
5757
return wip.getAndIncrement() == 0;
5858
}
5959

60-
public final boolean fastEnter() {
61-
return wip.get() == 0 && wip.compareAndSet(0, 1);
62-
}
63-
6460
protected final void fastPathEmit(U value, boolean delayError, Disposable dispose) {
6561
final Observer<? super V> observer = downstream;
6662
final SimplePlainQueue<U> q = queue;

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,8 @@ void drain() {
182182
boolean empty = cs == null;
183183

184184
if (d && empty) {
185-
if (once.compareAndSet(false, true)) {
186-
downstream.onComplete();
187-
}
185+
// errors never set done or call drain.
186+
downstream.onComplete();
188187
return;
189188
}
190189

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeArray.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,8 @@ public void onError(Throwable e) {
8686
@Override
8787
public void onComplete() {
8888
if (decrementAndGet() == 0) {
89-
if (once.compareAndSet(false, true)) {
90-
downstream.onComplete();
91-
}
89+
// errors don't decrement this
90+
downstream.onComplete();
9291
}
9392
}
9493

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import io.reactivex.rxjava3.disposables.*;
2020
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
2121

22-
public final class CompletableMergeDelayErrorArray extends Completable {
22+
public final class CompletableMergeArrayDelayError extends Completable {
2323

2424
final CompletableSource[] sources;
2525

26-
public CompletableMergeDelayErrorArray(CompletableSource[] sources) {
26+
public CompletableMergeArrayDelayError(CompletableSource[] sources) {
2727
this.sources = sources;
2828
}
2929

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeDelayErrorIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.rxjava3.core.*;
2121
import io.reactivex.rxjava3.disposables.CompositeDisposable;
2222
import io.reactivex.rxjava3.exceptions.Exceptions;
23-
import io.reactivex.rxjava3.internal.operators.completable.CompletableMergeDelayErrorArray.*;
23+
import io.reactivex.rxjava3.internal.operators.completable.CompletableMergeArrayDelayError.*;
2424
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
2525

2626
public final class CompletableMergeDelayErrorIterable extends Completable {

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeIterable.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,8 @@ public void onError(Throwable e) {
128128
@Override
129129
public void onComplete() {
130130
if (wip.decrementAndGet() == 0) {
131-
if (compareAndSet(false, true)) {
132-
downstream.onComplete();
133-
}
131+
// errors don't decrement wip
132+
downstream.onComplete();
134133
}
135134
}
136135

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableNext.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,8 @@ private boolean moveToNext() {
9999
if (nextNotification.isOnComplete()) {
100100
return false;
101101
}
102-
if (nextNotification.isOnError()) {
103-
error = nextNotification.getError();
104-
throw ExceptionHelper.wrapOrThrow(error);
105-
}
106-
throw new IllegalStateException("Should not reach here");
102+
error = nextNotification.getError();
103+
throw ExceptionHelper.wrapOrThrow(error);
107104
} catch (InterruptedException e) {
108105
subscriber.dispose();
109106
error = e;

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBuffer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void onError(Throwable t) {
133133
RxJavaPlugins.onError(t);
134134
return;
135135
}
136+
buffer = null;
136137
done = true;
137138
downstream.onError(t);
138139
}
@@ -145,8 +146,9 @@ public void onComplete() {
145146
done = true;
146147

147148
C b = buffer;
149+
buffer = null;
148150

149-
if (b != null && !b.isEmpty()) {
151+
if (b != null) {
150152
downstream.onNext(b);
151153
}
152154
downstream.onComplete();
@@ -390,7 +392,7 @@ public void onNext(T t) {
390392

391393
C b = bs.peek();
392394

393-
if (b != null && b.size() + 1 == size) {
395+
if (b.size() + 1 == size) {
394396
bs.poll();
395397

396398
b.add(t);

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatest.java

Lines changed: 27 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
package io.reactivex.rxjava3.internal.operators.flowable;
1515

16-
import java.util.Iterator;
1716
import java.util.Objects;
1817
import java.util.concurrent.atomic.*;
1918

@@ -73,75 +72,46 @@ public FlowableCombineLatest(@NonNull Iterable<? extends Publisher<? extends T>>
7372
@SuppressWarnings("unchecked")
7473
@Override
7574
public void subscribeActual(Subscriber<? super R> s) {
76-
Publisher<? extends T>[] a = array;
77-
int n;
78-
if (a == null) {
79-
n = 0;
80-
a = new Publisher[8];
81-
82-
Iterator<? extends Publisher<? extends T>> it;
75+
Publisher<? extends T>[] sources = array;
76+
int count;
77+
if (sources == null) {
78+
count = 0;
79+
sources = new Publisher[8];
8380

8481
try {
85-
it = Objects.requireNonNull(iterable.iterator(), "The iterator returned is null");
86-
} catch (Throwable e) {
87-
Exceptions.throwIfFatal(e);
88-
EmptySubscription.error(e, s);
89-
return;
90-
}
91-
92-
for (;;) {
93-
94-
boolean b;
95-
96-
try {
97-
b = it.hasNext();
98-
} catch (Throwable e) {
99-
Exceptions.throwIfFatal(e);
100-
EmptySubscription.error(e, s);
101-
return;
102-
}
103-
104-
if (!b) {
105-
break;
106-
}
107-
108-
Publisher<? extends T> p;
109-
110-
try {
111-
p = Objects.requireNonNull(it.next(), "The publisher returned by the iterator is null");
112-
} catch (Throwable e) {
113-
Exceptions.throwIfFatal(e);
114-
EmptySubscription.error(e, s);
115-
return;
116-
}
117-
118-
if (n == a.length) {
119-
Publisher<? extends T>[] c = new Publisher[n + (n >> 2)];
120-
System.arraycopy(a, 0, c, 0, n);
121-
a = c;
82+
for (Publisher<? extends T> p : iterable) {
83+
if (count == sources.length) {
84+
Publisher<? extends T>[] b = new Publisher[count + (count >> 2)];
85+
System.arraycopy(sources, 0, b, 0, count);
86+
sources = b;
87+
}
88+
sources[count++] = Objects.requireNonNull(p, "The Iterator returned a null Publisher");
12289
}
123-
a[n++] = p;
90+
} catch (Throwable ex) {
91+
Exceptions.throwIfFatal(ex);
92+
EmptySubscription.error(ex, s);
93+
return;
12494
}
12595

12696
} else {
127-
n = a.length;
97+
count = sources.length;
12898
}
12999

130-
if (n == 0) {
100+
if (count == 0) {
131101
EmptySubscription.complete(s);
132102
return;
133103
}
134-
if (n == 1) {
135-
a[0].subscribe(new MapSubscriber<>(s, new SingletonArrayFunc()));
104+
if (count == 1) {
105+
sources[0].subscribe(new MapSubscriber<>(s, new SingletonArrayFunc()));
136106
return;
137107
}
138108

139109
CombineLatestCoordinator<T, R> coordinator =
140-
new CombineLatestCoordinator<>(s, combiner, n, bufferSize, delayErrors);
110+
new CombineLatestCoordinator<>(s, combiner, count, bufferSize, delayErrors);
141111

142112
s.onSubscribe(coordinator);
143113

144-
coordinator.subscribe(a, n);
114+
coordinator.subscribe(sources, count);
145115
}
146116

147117
static final class CombineLatestCoordinator<T, R>
@@ -173,7 +143,7 @@ static final class CombineLatestCoordinator<T, R>
173143

174144
volatile boolean done;
175145

176-
final AtomicReference<Throwable> error;
146+
final AtomicThrowable error;
177147

178148
CombineLatestCoordinator(Subscriber<? super R> actual,
179149
Function<? super Object[], ? extends R> combiner, int n,
@@ -189,7 +159,7 @@ static final class CombineLatestCoordinator<T, R>
189159
this.latest = new Object[n];
190160
this.queue = new SpscLinkedArrayQueue<>(bufferSize);
191161
this.requested = new AtomicLong();
192-
this.error = new AtomicReference<>();
162+
this.error = new AtomicThrowable();
193163
this.delayErrors = delayErrors;
194164
}
195165

@@ -205,6 +175,7 @@ public void request(long n) {
205175
public void cancel() {
206176
cancelled = true;
207177
cancelAll();
178+
drain();
208179
}
209180

210181
void subscribe(Publisher<? extends T>[] sources, int n) {
@@ -411,20 +382,15 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a, SpscLinkedArr
411382
if (cancelled) {
412383
cancelAll();
413384
q.clear();
385+
error.tryTerminateAndReport();
414386
return true;
415387
}
416388

417389
if (d) {
418390
if (delayErrors) {
419391
if (empty) {
420392
cancelAll();
421-
Throwable e = ExceptionHelper.terminate(error);
422-
423-
if (e != null && e != ExceptionHelper.TERMINATED) {
424-
a.onError(e);
425-
} else {
426-
a.onComplete();
427-
}
393+
error.tryTerminateConsumer(a);
428394
return true;
429395
}
430396
} else {

0 commit comments

Comments
 (0)