Merge
provides an overload which takes a max concurrency.
Its signature looks like: IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);
Here is what it would look like with your example (I refactored some of the other code as well, which you can take or leave):
return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext,
Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
return _processers.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync));
}))
//Only allow 3 streams to be execute in parallel.
.Merge(3);
To break down what this does,
While
will check each iteration, if _provider.HasNext
is true,
if so then it will resubscribe to get the next value for
_provider
, otherwise it emits onCompleted
- Inside of select a new observable stream is created, but not yet evaluated by using
Defer
- The returned
IObservable<IObservable<T>>
is passed to Merge
which subscribes to a max of 3 observables simultaneously.
- The inner observable finally evaluates when it is subscribed to.
Alternative 1
If you also need to control the number of parallel requests you need to get a little trickier, since you will need to signal that your Observable
is ready for new values:
return Observable.Create<T>(observer =>
{
var subject = new Subject<Unit>();
var disposable = new CompositeDisposable(subject);
disposable.Add(subject
//This will complete when provider has run out of values
.TakeWhile(_ => _provider.HasNext)
.SelectMany(
_ => _provider.GetNextAsync(),
(_, item) =>
{
return _processors
.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync))
//Could also use `Finally` here, this signals the chain
//to start on the next item.
.Do(dontCare => {}, () => subject.OnNext(Unit.Default));
}
)
.Merge(3)
.Subscribe(observer));
//Queue up 3 requests for the initial kickoff
disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));
return disposable;
});
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…