Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
161 views
in Technique[技术] by (71.8m points)

c# - How can I create a class that is both a Task<T> and an IObservable<T>?

Recently I encountered a situation where having an asynchronous operation represented both as a Task<T> and as an IObservable<T> would be advantageous. The task representation maintains the state of the operation (IsCompleted, IsFaulted etc), while the observable representation enables the composition of multiple operations in interesting ways (Concat, Merge, Switch etc), handling automatically the cancellation of any operation that has been unsubscribed along the way, solving this way the problem of fire-and-forgotten asynchronous operations. So I've become interested about ways to combine these two representations.

The easy, and probably correct, way to combine them would be through composition: creating a type that stores internally a Task<T> and an IObservable<T>, and exposes them as two of its properties. But in this question I am interested about the challenging, and probably impractical, possibility of a type that is a Task<T> and is an IObservable<T> at the same time. A type that can be passed directly to APIs that accept either tasks or observables, and do the right thing in either case. So it can't be just a task-like object. It must inherit from the real thing, the Task<T> class itself. Something like this:

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
    {
        //...
    }
}

Creating an AsyncOperation instance should invoke immediately the supplied action. In other words an AsyncOperation should represent a hot task/observable combo.

Is it possible to create such a type?

Btw here is a thread in the ReactiveX/RxJava library that proves that others have thought about this problem before: No "isCompleted" or "isErrored" methods on Observable

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I found a way to create an observable that inherits from Task, by using a genius technique described by @GlennSlayden in this answer.

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    private readonly IObservable<TResult> _observable;
    private readonly Task<TResult> _promise;

    private AsyncOperation(Func<TResult> function) : base(() => function())
        => function = this.GetResult;

    private TResult GetResult() => _promise.GetAwaiter().GetResult();

    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
        : this((Func<TResult>)null)
    {
        _observable = Observable.StartAsync(action, Scheduler.Immediate);
        _promise = _observable.ToTask();
        _promise.ContinueWith(_ => base.RunSynchronously(), default,
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
        => _observable.Subscribe(observer);
}

The above solution is not perfect because an instance of the derived class can never transition to the Canceled state. This is a problem I don't know how to fix, and it may not be fixable, but it's probably not very important. A cancellation emerges as a TaskCanceledException, and handling this exception is the normal way of dealing with canceled tasks anyway.

Interestingly the asynchronous operation can be canceled by creating a dummy subscription and disposing it:

var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });

operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken

I experimented with this class a bit and I found that it's less practical than I initially thought it would be. The problem is that many APIs exist that support both tasks and observables, and are identical otherwise (for example Concat, Merge, Switch, Wait etc). This leads to the frequent appearance of compile-time errors (CS0121 ambiguous call). Resolving the ambiguities is possible by casting the type as either task or observable, but this is awkward, and negates the whole purpose of combining the two types in the first place.


Clarification: The line _promise.GetAwaiter().GetResult() may indicate at first glance that this implementation blocks a ThreadPool thread. This is not the case because the base Task is initially cold, and it's only warmed when the _promise has completed.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...