You have published the wrong observable.
With the current code you are merging and then publishing like this Observable.Merge(a, b).Publish();
. Now since a
& b
are defined against expensive
you still get two subscriptions to expensive
.
The subscriptions create these pipelines:
You can see this if you take out the .Publish();
from your code. The output becomes:
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
This creates these pipelines:
So, by shifting the .Publish()
back up to expensive
you eliminate the problem. That's where you really needed it because it is the expensive operation after all.
This is the code you needed:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var connectable = expensive.Publish();
var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();
That nicely produces the following:
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }
And this gives you these pipelines:
You can see from this image that there is still duplication. That's fine because these parts aren't expensive.
The duplication is actually important. Shared parts of the pipelines make their endpoints vulnerable to errors and thus to early termination. The less sharing the better for the robustness of the code. It's only when you have an expensive operation that you should worry about publishing. Otherwise you should just let the pipelines be themselves.
Here's an example to show it. If you don't have a published source then, if one source produces an error then it doesn't pull down all of the pipelines.
But once you introduce a shared observable then a single error will bring down all of the pipelines.