Open
Description
Bug
In the async version of Observable.Using, in observable factory delegate cancellationToken gets canceled not at the moment of destroying subscription. The synchronous version looks ok as there are no cancellation tokens.
Reproduced in System.Reactive.Linq 6.0.0, x64, Win11, dotnet version 8.0.100
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using static System.Reactive.Linq.Observable;
public class Program
{
public static async Task Main()
{
var enumeration = Observable.Using<int, IDisposable>(
async (cancellation) => {
cancellation.Register(() => Console.WriteLine("Resource factory cancellation called"));
await Task.Yield();
Console.WriteLine("Created resource");
return Disposable.Create(() => Console.WriteLine("Disposed resource"));
},
async (resource, cancellation) => {
cancellation.Register(() => Console.WriteLine("Resource observable cancellation called")); // why is this called not when subscription disposing?
Console.WriteLine("Init resource observable");
await Task.Yield();
// return -1 immediately and then tick with 1 second interval
var underlyingObservable = Observable.Return(-1)
.Concat(
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select((t,i) => i).Take(10)
);
return new AnonymousObservable<int>((observer) => {
Console.WriteLine("Subscribed");
return new CompositeDisposable(
underlyingObservable.Subscribe(observer),
Disposable.Create(() => Console.WriteLine("Unsubscribed"))
);
});
}
);
enumeration = enumeration.Publish().RefCount();
var subscription = enumeration.Subscribe(
num => Console.WriteLine("Next:{0}", num),
ex => Console.WriteLine("Error:{0}", ex),
() => Console.WriteLine("Completed")
);
await Task.Delay(5000);
subscription.Dispose();
}
}
Actual output:
Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Resource observable cancellation called << here is the problem
Next:0
Next:1
Next:2
Next:3
Unsubscribed
Disposed resource
Expected output:
Created resource
Init resource observable
Resource factory cancellation called
Subscribed
Next:-1
Next:0
Next:1
Next:2
Next:3
Resource observable cancellation called
Unsubscribed
Disposed resource