I found a way to create an observable that inherits from Task
, by using a genius technique described by @GlennSlayden in this answer.
public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
private readonly IObservable<TResult> _observable;
private readonly Task<TResult> _promise;
private AsyncOperation(Func<TResult> function) : base(() => function())
=> function = this.GetResult;
private TResult GetResult() => _promise.GetAwaiter().GetResult();
public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
: this((Func<TResult>)null)
{
_observable = Observable.StartAsync(action, Scheduler.Immediate);
_promise = _observable.ToTask();
_promise.ContinueWith(_ => base.RunSynchronously(), default,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
=> _observable.Subscribe(observer);
}
The above solution is not perfect because an instance of the derived class can never transition to the Canceled
state. This is a problem I don't know how to fix, and it may not be fixable, but it's probably not very important. A cancellation emerges as a TaskCanceledException
, and handling this exception is the normal way of dealing with canceled tasks anyway.
Interestingly the asynchronous operation can be canceled by creating a dummy subscription and disposing it:
var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });
operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken
I experimented with this class a bit and I found that it's less practical than I initially thought it would be. The problem is that many APIs exist that support both tasks and observables, and are identical otherwise (for example Concat
, Merge
, Switch
, Wait
etc). This leads to the frequent appearance of compile-time errors (CS0121 ambiguous call). Resolving the ambiguities is possible by casting the type as either task or observable, but this is awkward, and negates the whole purpose of combining the two types in the first place.
Clarification: The line _promise.GetAwaiter().GetResult()
may indicate at first glance that this implementation blocks a ThreadPool
thread. This is not the case because the base Task
is initially cold, and it's only warmed when the _promise
has completed.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…