Skip to content

Fix that exceptions will be swallowed by schedulers (#6954) #6955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.schedulers;

import java.util.concurrent.*;

/**
* Factory of FutureTasks which will call CompleteHandler on complete.
*/
public class CompleteFutureTasks {
/**
* Builds a future task that calls completeHandle on complete.
**/
public static <V> RunnableFuture<V> newCompleteFutureTask(
RunnableFuture<V> base, CompleteScheduledExecutorService.CompleteHandler<V> completeHandler) {
return new CompleteFutureTask<>(base, completeHandler);
}

/**
* Builds a scheduled future task that calls completeHandle on complete.
*/
public static <V> RunnableScheduledFuture<V> newCompleteScheduledFutureTask(
RunnableScheduledFuture<V> base, CompleteScheduledExecutorService.CompleteHandler<V> completeHandler) {
return new CompleteScheduledFutureTask<>(base, completeHandler);
}

public static class CallableWithCompleteHandler<V> implements Callable<V> {
public CallableWithCompleteHandler(
Callable<V> base, CompleteScheduledExecutorService.CompleteHandler<V> completeHandler) {
this.base = base;
this.completeHandler = completeHandler;
}

public CallableWithCompleteHandler(
Runnable runnable, V result, CompleteScheduledExecutorService.CompleteHandler<V> completeHandler) {
this.base = () -> {
runnable.run();
return result;
};
this.completeHandler = completeHandler;
}

@Override
public V call() throws Exception {
return base.call();
}

public CompleteScheduledExecutorService.CompleteHandler<V> getCompleteHandler() {
return completeHandler;
}

private final Callable<V> base;
private final CompleteScheduledExecutorService.CompleteHandler<V> completeHandler;
}

public static class RunnableWithCompleteHandler implements Runnable {
public RunnableWithCompleteHandler(
Runnable base, CompleteScheduledExecutorService.CompleteHandler<Void> completeHandler) {
this.base = base;
this.completeHandler = completeHandler;
}

@Override
public void run() {
base.run();
}

public CompleteScheduledExecutorService.CompleteHandler<Void> getCompleteHandler() {
return completeHandler;
}

private final Runnable base;
private final CompleteScheduledExecutorService.CompleteHandler<Void> completeHandler;
}

private static class CompleteFutureTask<V> implements RunnableFuture<V> {
public CompleteFutureTask(
RunnableFuture<V> base, CompleteScheduledExecutorService.CompleteHandler<V> completeHandler) {
this.base = base;
this.completeHandler = completeHandler;

this.seq = 0;
}

@Override
public void run() {
long s = ++seq;

try {
base.run();
} finally {
if (!base.isDone()) {
return;
}

// interesting case:
// We may get the done state of next run if base is a periodic task and posted to another thread for
// its next run.
// A simple seq test will solve that. We even don't have to make seq volatile, because it is correctly
// synchronized by isDone() and re-schedule locks!
if (seq != s) {
return;
}

if (base.isCancelled()) {
return;
}

completeHandler.onComplete(this);
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return base.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return base.isCancelled();
}

@Override
public boolean isDone() {
return base.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return base.get();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return base.get(timeout, unit);
}

private long seq;

private final RunnableFuture<V> base;
private final CompleteScheduledExecutorService.CompleteHandler<V> completeHandler;
}

private static class CompleteScheduledFutureTask<V> extends CompleteFutureTask<V>
implements RunnableScheduledFuture<V> {
public CompleteScheduledFutureTask(
RunnableScheduledFuture<V> base, CompleteScheduledExecutorService.CompleteHandler<V> completeHandler) {
super(base, completeHandler);
this.base = base;
}

private final RunnableScheduledFuture<V> base;

@Override
public boolean isPeriodic() {
return base.isPeriodic();
}

@Override
public long getDelay(TimeUnit unit) {
return base.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return base.compareTo(o);
}
}

/** Utility class. */
private CompleteFutureTasks() {
throw new IllegalStateException("No instances!");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.schedulers;

import java.util.concurrent.*;

/**
* A ScheduledExecutorService that supports setting a complete handler that
* will receive the Future of the submitted job once it completes.
*/
public interface CompleteScheduledExecutorService extends ScheduledExecutorService {

interface CompleteHandler<V> {
void onComplete(Future<V> task);
}

<V> Future<V> submit(Callable<V> task,
CompleteHandler<V> completeHandler);

<V> Future<V> submit(Runnable task,
V result,
CompleteHandler<V> completeHandler);

<V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit,
CompleteHandler<V> completeHandler);

<V> ScheduledFuture<V> schedule(Runnable command,
V result,
long delay,
TimeUnit unit,
CompleteHandler<V> completeHandler);

ScheduledFuture<Void> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit,
CompleteHandler<Void> completeHandler);

ScheduledFuture<Void> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit,
CompleteHandler<Void> completeHandler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.schedulers;

import java.util.concurrent.*;

/**
* {@link CompleteScheduledExecutorService} factory.
*/
public class CompleteScheduledExecutors {
public static CompleteScheduledExecutorService newSingleThreadExecutor() {
return new CompleteScheduledThreadPoolExecutor(1);
}

public static CompleteScheduledExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new CompleteScheduledThreadPoolExecutor(1, threadFactory);
}

public static CompleteScheduledExecutorService newSingleThreadExecutor(RejectedExecutionHandler handler) {
return new CompleteScheduledThreadPoolExecutor(1, handler);
}

public static CompleteScheduledExecutorService newSingleThreadExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
return new CompleteScheduledThreadPoolExecutor(1, threadFactory, handler);
}

public static CompleteScheduledExecutorService newThreadPoolExecutor(int corePoolSize) {
return new CompleteScheduledThreadPoolExecutor(corePoolSize);
}

public static CompleteScheduledExecutorService newThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
return new CompleteScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

public static CompleteScheduledExecutorService newThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
return new CompleteScheduledThreadPoolExecutor(corePoolSize, handler);
}

public static CompleteScheduledExecutorService newThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
return new CompleteScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler);
}

/** Utility class. */
private CompleteScheduledExecutors() {
throw new IllegalStateException("No instances!");
}
}
Loading