Skip to content

3.x: Exceptions will be swallowed when schedulers are involved #6954

@jxdabc

Description

@jxdabc

In general, exceptions should never be swallowed and should be handled somewhere, eg., catch clause or uncaught exception handlers of threads.

As to RxJava, exceptions are expected to be handled at:

  • observer's onError(), or
  • RxJava exception handler (RxJavaPlugins.onError()), or
  • thread's uncaught exception handler (eg., exceptions thrown by Exceptions.throwIfFatal()),

and not to be magically swallowed somewhere.

This is true in most situations, but is broken when schedulers are involved. That is, the following tests will fail:

(Test 1) Exceptions from observables are swallowed:

@Test
public void exceptionFromObservableShouldNotBeSwallowed() throws Exception {
    // Exceptions, fatal or not, should be handled by
    // #1 observer's onError(), or
    // #2 RxJava exception handler, or
    // #3 thread's uncaught exception handler,
    // and should not be swallowed.
    try {
        CountDownLatch latch = new CountDownLatch(1);

        // #3 thread's uncaught exception handler
        Scheduler computationScheduler = new ComputationScheduler(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setUncaughtExceptionHandler((thread, throwable) -> {
                    latch.countDown();
                });
                return t;
            }
        });

        // #2 RxJava exception handler
        RxJavaPlugins.setErrorHandler(h -> {
            latch.countDown();
        });

        // #1 observer's onError()
        Observable.create(s -> {

            s.onNext(1);

            if (true) {
                throw new OutOfMemoryError();
            }

            s.onComplete();

        }).subscribeOn(computationScheduler)
        .subscribe(v -> {
        }, e -> {
            latch.countDown();
        });

        assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        RxJavaPlugins.reset();
    }
}

(Test 2) Exceptions from observers are swallowed:

@Test
public void exceptionFromObserverShouldNotBeSwallowed() throws Exception {
    // Exceptions, fatal or not, should be handled by
    // #1 observer's onError(), or
    // #2 RxJava exception handler, or
    // #3 thread's uncaught exception handler,
    // and should not be swallowed.
    try {
        CountDownLatch latch = new CountDownLatch(1);

        // #3 thread's uncaught exception handler
        Scheduler computationScheduler = new ComputationScheduler(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setUncaughtExceptionHandler((thread, throwable) -> {
                    latch.countDown();
                });
                return t;
            }
        });

        // #2 RxJava exception handler
        RxJavaPlugins.setErrorHandler(h -> {
            latch.countDown();
        });

        // #1 observer's onError()
        Flowable.interval(500, TimeUnit.MILLISECONDS, computationScheduler)
                .subscribe(v -> {
                    throw new OutOfMemoryError();
                }, e -> {
                    latch.countDown();
                });

        assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        RxJavaPlugins.reset();
    }
}

Sometimes these broken cases matter in certain situation or on certain platform, eg.,

  • As Test 1, when out-of-memory occurs before emitter.onComplete(), the application may block forever waiting for onComplete(), because no one in the whole application could possibly known about this exception.
  • On Android, on which I code a lot, the exception handing contract implies that any runtime exception or error in any thread should throw to the uncaught exception handler, which will terminate the application. Almost all async tools, including the built in AsyncTask, conform to this contract. Setting an error handler that always throws runtime exceptions and errors via RxJavaPlugins.setErrorHandler() on Android almost makes it. But it doesn't work when schedulers are involved.

The cause is clear. Internal to Scheduler implementation, RxJava only uses Future of a task submitted to Executor as a cancel-handle and never check exceptions inside the Future while any exception thrown by the submitted task will go into the Future.

But the fix is not as easy, there is no chance to check the Future for exception since RxJava pushes task result instead of pulls it.

Pulling results is the design intent of Future. When we won't, I think we should customize the FutureTask which runs the task in Executor and provides the Future function to give us a chance to handle the result(including the exception).

Actually, JDK has given us all we need to do this via ThreadPoolExecutor#newTaskFor, ScheduledThreadPoolExecutor#decorateTask, RunnableFuture, RunnableScheduledFuture, etc. And Android did something similar in its AsyncTask implementation.

I'll propose a PR that:

  • Implements a CompleteScheduledExecutorService which is a ScheduledExecutorService that supports setting a complete handler that will receive the Future(and handle the task result) of the submitted task once it completes in the executing thread.
  • Modifies executor based schedulers to use CompleteScheduledExecutorService and check the Future for exceptions(and route it to RxJavaPlugins.onError() or throw it using Exceptions.throwIfFatal()) of submitted tasks in their complete handlers.
  • With test cases cover CompleteScheduledExecutorService related code and the above Test 1 & Test 2.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions