Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
417 views
in Technique[技术] by (71.8m points)

c# - Why is IEnumerable.ToObservable so slow?

I am trying to enumerate a large IEnumerable once, and observe the enumeration with various operators attached (Count, Sum, Average etc). The obvious way is to transform it to an IObservable with the method ToObservable, and then subscribe an observer to it. I noticed that this is much slower than other methods, like doing a simple loop and notifying the observer on each iteration, or using the Observable.Create method instead of ToObservable. The difference is substantial: it's 20-30 times slower. It is what it is, or am I doing something wrong?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Output:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Windows 10, Console App, Release built


Update: Here is an example of the actual functionality I want to achieve:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

Output:

Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5

The important difference of this approach compared to using standard LINQ operators, is that the source enumerable is enumerated only once.


One more observation: using ToObservable(Scheduler.Immediate) is slightly faster (about 20%) than ToObservable().

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

This is the difference between a well behaved observable and a "roll-your-own-because-you-think-faster-is-better-but-it-is-not" observable.

When you dive down far enough in the source you discover this lovely little line:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

The is effectively calling hasNext = enumerator.MoveNext(); once per scheduled recursive iteration.

This allows you to choose the scheduler for your .ToObservable(schedulerOfYourChoice) call.

With the other options you've chosen you've created a bare-to-the-bone series of calls to .OnNext that virtually do nothing. Method2 doesn't even has a .Subscribe call.

Both of Method2 and Method1 run using the current thread and both run to completion before the subscription is finished. They are blocking calls. They can cause race conditions.

Method1 is the only one that behaves nicely as an observable. It is asynchronous and it can run independently of the subscriber.

Do keep in mind that observables are collections that run over time. They typically have an async source or a timer or the respond to external stimulus. They don't often run off of a plain enumerable. If you're working with an enumerable then working synchronously should be expected to run faster.

Speed is not the goal of Rx. Performing complex queries on time-based, pushed values is the goal.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...