There are two parts to an observable query, the Query
itself and the Subscription
. (This is also the difference between the ObserveOn and SubscribeOn operators.)
Your Query
is
Enumerable
.Range(1, 4)
.ToObservable(NewThreadScheduler.Default);
This creates an observable that produces values on the default NewThreadScheduler
for that system.
Your Subscription is
obsQuery.Subscribe(DoWork, Done);
This runs DoWork
for each value produced by the Query
and Done
when the Query
finishes with an OnComplete
call. I don't think there are any guarantees about what thread the functions in the subscribe method will be called on, in practice if all values of the query are produced on the same thread that is the thread the subscription will be run on. It appears they are also making it so all of the subscription calls are made on the same thread which is most likely done to get rid of a lot of common multi-threading errors.
So you have two issues, one is with your logging, if you change your Query
to
Enumerable
.Range(1, 4)
.Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId);
.ToObservable(NewThreadScheduler.Default);
You'll see each value produced on a new thread.
The other issue is one of the intention and design of Rx. It's intended that the Query
is the long-running process and the Subscription
is a short method that deals with the results. If you want to run a long running function as an Rx Observable your best option is to use Observable.ToAsync.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…