Skip to content

Fix that exceptions will be swallowed by schedulers (#6954) #6955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

jxdabc
Copy link

@jxdabc jxdabc commented Apr 11, 2020

Fix that RxJava will miss exceptions in Future thrown by tasks executed by
executors

Signed-off-by: jiaoxiaodong [email protected]

In general, exceptions should never be swallowed and should be handled somewhere, eg., catch clause or uncaught exception handlers of threads.

As to RxJava, exceptions are expected to be handled at:

  • observer's onError(), or
  • RxJava exception handler (RxJavaPlugins.onError()), or
  • thread's uncaught exception handler (eg., exceptions thrown by Exceptions.throwIfFatal()),

and not to be magically swallowed somewhere.

This is true in most situations, but is broken when schedulers are involved. That is, the following tests will fail:

(Test 1) Exceptions from observables are swallowed:

@Test
public void exceptionFromObservableShouldNotBeSwallowed() throws Exception {
    // Exceptions, fatal or not, should be handled by
    // #1 observer's onError(), or
    // #2 RxJava exception handler, or
    // #3 thread's uncaught exception handler,
    // and should not be swallowed.
    try {
        CountDownLatch latch = new CountDownLatch(1);

        // #3 thread's uncaught exception handler
        Scheduler computationScheduler = new ComputationScheduler(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setUncaughtExceptionHandler((thread, throwable) -> {
                    latch.countDown();
                });
                return t;
            }
        });

        // #2 RxJava exception handler
        RxJavaPlugins.setErrorHandler(h -> {
            latch.countDown();
        });

        // #1 observer's onError()
        Observable.create(s -> {

            s.onNext(1);

            if (true) {
                throw new OutOfMemoryError();
            }

            s.onComplete();

        }).subscribeOn(computationScheduler)
        .subscribe(v -> {
        }, e -> {
            latch.countDown();
        });

        assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        RxJavaPlugins.reset();
    }
}

(Test 2) Exceptions from observers are swallowed:

@Test
public void exceptionFromObserverShouldNotBeSwallowed() throws Exception {
    // Exceptions, fatal or not, should be handled by
    // #1 observer's onError(), or
    // #2 RxJava exception handler, or
    // #3 thread's uncaught exception handler,
    // and should not be swallowed.
    try {
        CountDownLatch latch = new CountDownLatch(1);

        // #3 thread's uncaught exception handler
        Scheduler computationScheduler = new ComputationScheduler(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setUncaughtExceptionHandler((thread, throwable) -> {
                    latch.countDown();
                });
                return t;
            }
        });

        // #2 RxJava exception handler
        RxJavaPlugins.setErrorHandler(h -> {
            latch.countDown();
        });

        // #1 observer's onError()
        Flowable.interval(500, TimeUnit.MILLISECONDS, computationScheduler)
                .subscribe(v -> {
                    throw new OutOfMemoryError();
                }, e -> {
                    latch.countDown();
                });

        assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        RxJavaPlugins.reset();
    }
}

Sometimes these broken cases matter in certain situation or on certain platform, eg.,

  • As Test 1, when out-of-memory occurs before emitter.onComplete(), the application may block forever waiting for onComplete(), because no one in the whole application could possibly known about this exception.
  • On Android, on which I code a lot, the exception handing contract implies that any runtime exception or error in any thread should throw to the uncaught exception handler, which will terminate the application. Almost all async tools, including the built in AsyncTask, conform to this contract. Setting an error handler that always throws runtime exceptions and errors via RxJavaPlugins.setErrorHandler() on Android almost makes it. But it doesn't work when schedulers are involved.

The cause is clear. Internal to Scheduler implementation, RxJava only uses Future of a task submitted to Executor as a cancel-handle and never check exceptions inside the Future while any exception thrown by the submitted task will go into the Future.

But the fix is not as easy, there is no chance to check the Future for exception since RxJava pushes task result instead of pulls it.

Pulling results is the design intent of Future. When we won't, I think we should customize the FutureTask which runs the task in Executor and provides the Future function to give us a chance to handle the result(including the exception).

Actually, JDK has given us all we need to do this via ThreadPoolExecutor#newTaskFor, ScheduledThreadPoolExecutor#decorateTask, RunnableFuture, RunnableScheduledFuture, etc. And Android did something similar in its AsyncTask implementation.

I'll propose a PR that:

  • Implements a CompleteScheduledExecutorService which is a ScheduledExecutorService that supports setting a complete handler that will receive the Future(and handle the task result) of the submitted task once it completes in the executing thread.
  • Modifies executor based schedulers to use CompleteScheduledExecutorService and check the Future for exceptions(and route it to RxJavaPlugins.onError() or throw it using Exceptions.throwIfFatal()) of submitted tasks in their complete handlers.
  • With test cases cover CompleteScheduledExecutorService related code and the above Test 1 & Test 2.

Fix that RxJava will miss exceptions in Future thrown by tasks executed by
executors

Signed-off-by: jiaoxiaodong <[email protected]>
@akarnokd
Copy link
Member

There is a simpler solution. See #6956.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants