The Rx library includes operators that accept lambda parameters, and some of these lambdas are provided with a CancellationToken
that is controlled by the library itself. Some examples of these operators are the FromAsync
, StartAsync
and Create
:
// Converts an asynchronous action into an observable sequence. Each subscription
// to the resulting sequence causes the action to be started. The CancellationToken
// passed to the asynchronous action is tied to the observable sequence's subscription
// that triggered the action's invocation and can be used for best-effort cancellation.
public static IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync);
I was under the impression that the Rx library does a good job at managing the lifecycle of the CancellationTokenSource
s that obviously has to create behind the scenes, but I am not so sure any more. Let's first state that the documentation insists strongly that the CancellationTokenSource
s should be disposed of:
This type implements the IDisposable
interface. When you have finished using an instance of the type, you should dispose of it either directly or indirectly. To dispose of the type directly, call its Dispose
method in a try
/catch
block. To dispose of it indirectly, use a language construct such as using
(in C#) or Using
(in Visual Basic).
Also from here:
Always call Dispose
before you release your last reference to the CancellationTokenSource
. Otherwise, the resources it is using will not be freed until the garbage collector calls the CancellationTokenSource
object's Finalize
method.
I made the experiment below to test my assumptions. It uses reflection to read the private fields _source
and _disposed
of the types CancellationToken
and CancellationTokenSource
respectively (.NET 5).
CancellationToken capturedToken = default;
var subscription = Observable.FromAsync(async token =>
{
capturedToken = token;
token.Register(() => Console.WriteLine("Token canceled"));
await Task.Delay(Timeout.Infinite, token);
})
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(500)))
.Finally(() => Console.WriteLine("The observable was terminated"))
.Subscribe();
Thread.Sleep(1000);
var cts = (CancellationTokenSource)(typeof(CancellationToken)
.GetField("_source", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(capturedToken));
bool disposed = (bool)(typeof(CancellationTokenSource)
.GetField("_disposed", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(cts));
Console.WriteLine($"IsCancellationRequested: {cts.IsCancellationRequested}");
Console.WriteLine($"IsDisposed: {disposed}");
Output:
Token canceled
The observable was terminated
IsCancellationRequested: True
IsDisposed: False
Try it on Fiddle (.NET Framework version, having differently named private fields)
The captured CancellationToken
is inspected half a second after the asynchronous operation has been canceled and the observable has terminated. The _disposed
field has the value false
, indicating that the Dispose
method of the associated CancellationTokenSource
has not been invoked. Am I doing something wrong, or the Rx library indeed omits disposing of the CancellationTokenSource
s it creates?
.NET 5.0.1, System.Reactive 5.0.0, C# 9
See Question&Answers more detail:
os