Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
148 views
in Technique[技术] by (71.8m points)

c# - Why does repeated Enumerable to Observable conversion block

This is a rather educational, out of curiosity question. Consider the following snippet:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();

Where SubscribeDebug subscribes a simple observer:

public class DebugObserver<T> : IObserver<T>
{
    public void OnCompleted()
    {
        Debug.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Debug.WriteLine("Error");
    }

    public void OnNext(T value)
    {
        Debug.WriteLine("Value: {0}", value);
    }
}

The output of this is:

Value: 0

Value: 1

Value: 2

Value: 3

Value: 4

And then blocks. Can someone help me understand the underlying reason why it happens and why the observable does not complete? I have noticed that it does complete without the Concat call, but blocks with it.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I've looked at the source of ToObservable and distilled a minimal implementation. It does reproduce the behavior we're seeing.

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
        ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
        Observable.Create<T>
        (
            observer =>
            {
                IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
                {
                    if (enumerator.MoveNext()) 
                    {
                        observer.OnNext(enumerator.Current);
                        inner.Schedule(enumerator, loopRec); //<-- culprit
                    }
                    else
                    {
                        observer.OnCompleted();
                    }

                    // ToObservable.cs Line 117
                    // We never allow the scheduled work to be cancelled. 
                    return Disposable.Empty;
                }

                return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
            }
        );

With that out of the way - the crux of the problem lies in the behavior of CurrentThreadScheduler, which is the default scheduler used.

The behavior of CurrentThreadScheduler is that if a schedule is already running while Schedule is being called - it ends up being queued.

        CurrentThreadScheduler.Instance.Schedule(() =>
        {
            CurrentThreadScheduler.Instance.Schedule(() =>
                Console.WriteLine(1)
            );

            Console.WriteLine(2);
        });

This prints 2 1. This queuing behavior is our undoing.

When observer.OnCompleted() is called, it causes Concat to start the next enumeration - however, things are not the same as when we started out - we are still inside the observer => { } block when we try to schedule the next one. So instead of executing immediately, the next schedule gets queued.

Now enumerator.MoveNext() is caught in a dead-lock. It can't move to the next item - MoveNext is blocking until the next item arrives - which can only arrive when scheduled by the ToObservable loop.

But the Scheduler can only work to notify ToEnumerable and subsequently MoveNext() which is being held up - once it exits loopRec - which it can't because it's being blocked by MoveNext in the first place.

Addendum

This is approximately what ToEnumerable (from GetEnumerator.cs) does (not a valid implementation):

    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
    {
        var gate = new SemaphoreSlim(0);
        var queue = new ConcurrentQueue<T>();

        using(observable.Subscribe(
            value => { queue.Enqueue(value); gate.Release(); }, 
            () => gate.Release()))
        while (true)
        {
            gate.Wait(); //this is where it blocks                

            if (queue.TryDequeue(out var current))
                yield return current;
            else
                break;
        }
    }

Enumerables are expected to be blocking until the next item is yielded - and that's why there's a gating implementation. It's not Enumerable.Range which blocks, but ToEnumerable.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...