Skip to content

Issue with CancellationToken lifetime in Observable.Using (async version) #2089

Open
@DenomikoN

Description

@DenomikoN

Bug

In the async version of Observable.Using, in observable factory delegate cancellationToken gets canceled not at the moment of destroying subscription. The synchronous version looks ok as there are no cancellation tokens.

Reproduced in System.Reactive.Linq 6.0.0, x64, Win11, dotnet version 8.0.100

using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using static System.Reactive.Linq.Observable;
					
public class Program
{
    public static async Task Main()
    {
        var enumeration = Observable.Using<int, IDisposable>(
                async (cancellation) => {
                    cancellation.Register(() => Console.WriteLine("Resource factory cancellation called"));
                    await Task.Yield();
                    Console.WriteLine("Created resource");
                    return Disposable.Create(() => Console.WriteLine("Disposed resource"));
                },

                async (resource, cancellation) => {
                    cancellation.Register(() => Console.WriteLine("Resource observable cancellation called")); // why is this called not when subscription disposing?

                    Console.WriteLine("Init resource observable");
                    await Task.Yield();
                    // return -1 immediately and then tick with 1 second interval
                    var underlyingObservable = Observable.Return(-1)
                                                        .Concat(
                                                            Observable
                                                            .Interval(TimeSpan.FromSeconds(1))
                                                            .Select((t,i) => i).Take(10)
                                                        );
                    return new AnonymousObservable<int>((observer) => {
                        Console.WriteLine("Subscribed");
                        return new CompositeDisposable(
                            underlyingObservable.Subscribe(observer),
                            Disposable.Create(() => Console.WriteLine("Unsubscribed"))
                        );
                    });
                }
            );


        enumeration = enumeration.Publish().RefCount();

        var subscription = enumeration.Subscribe(
            num => Console.WriteLine("Next:{0}", num),
            ex => Console.WriteLine("Error:{0}", ex),
            () => Console.WriteLine("Completed")
        );

        await Task.Delay(5000);

        subscription.Dispose();
    }
}

Actual output:

Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Resource observable cancellation called  << here is the problem
Next:0
Next:1
Next:2
Next:3
Unsubscribed
Disposed resource

Expected output:

Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Next:0
Next:1
Next:2
Next:3
Resource observable cancellation called
Unsubscribed
Disposed resource

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions