I think you want something like this. EDIT: From your comments, I see you have a synchronous repository API - I'll leave the asynchronous version in, and add a synchronous version afterwards. Notes inline:
Asynchronous Repository Version
An asynchronous repository interface could be something like this:
public interface IPartyRepository
{
Task<IEnumerable<Party>> GetAllAsync(out long partyCount);
Task<IEnumerable<Party>> SearchByNameAndNotesAsync(string searchTerm);
}
Then I refactor the query as:
var searchStream = Observable.FromEventPattern(
s => txtSearch.TextChanged += s,
s => txtSearch.TextChanged -= s)
.Select(evt => txtSearch.Text) // better to select on the UI thread
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
// placement of this is important to avoid races updating the UI
.ObserveOn(SynchronizationContext.Current)
.Do(_ =>
{
// I like to use Do to make in-stream side-effects explicit
this.parties.Clear();
this.partyBindingSource.ResetBindings(false);
})
// This is "the money" part of the answer:
// Don't subscribe, just project the search term
// into the query...
.Select(searchTerm =>
{
long partyCount;
var foundParties = string.IsNullOrEmpty(searchTerm)
? partyRepository.GetAllAsync(out partyCount)
: partyRepository.SearchByNameAndNotesAsync(searchTerm);
// I assume the intention of the Buffer was to load
// the data into the UI in batches. If so, you can use Buffer from nuget
// package Ix-Main like this to get IEnumerable<T> batched up
// without splitting it up into unit sized pieces first
return foundParties
// this ToObs gets us into the monad
// and returns IObservable<IEnumerable<Party>>
.ToObservable()
// the ToObs here gets us into the monad from
// the IEnum<IList<Party>> returned by Buffer
// and the SelectMany flattens so the output
// is IObservable<IList<Party>>
.SelectMany(x => x.Buffer(500).ToObservable())
// placement of this is again important to avoid races updating the UI
// erroneously putting it after the Switch is a very common bug
.ObserveOn(SynchronizationContext.Current);
})
// At this point we have IObservable<IObservable<IList<Party>>
// Switch flattens and returns the most recent inner IObservable,
// cancelling any previous pending set of batched results
// superceded due to a textbox change
// i.e. the previous inner IObservable<...> if it was incomplete
// - it's the equivalent of your TakeUntil, but a bit neater
.Switch()
.Subscribe(searchResults =>
{
this.parties.AddRange(searchResults);
this.partyBindingSource.ResetBindings(false);
},
ex => { },
() => { });
Synchronous Repository Version
An synchronous repository interface could be something like this:
public interface IPartyRepository
{
IEnumerable<Party> GetAll(out long partyCount);
IEnumerable<Party> SearchByNameAndNotes(string searchTerm);
}
Personally, I don't recommend a repository interface be synchronous like this. Why? It is typically going to do IO, so you will wastefully block a thread.
You might say the client could call from a background thread, or you could wrap their call in a task - but this is not the right way to go I think.
- The client doesn't "know" you are going to block; it's not expressed in the contract
- It should be the repository that handles the asynchronous aspect of the implementation - after all, how this is best achieved will only be known best by the repository implementer.
Anyway, accepting the above, one way to implement is like this (of course it's mostly similar to the async version so I've only annotated the differences):
var searchStream = Observable.FromEventPattern(
s => txtSearch.TextChanged += s,
s => txtSearch.TextChanged -= s)
.Select(evt => txtSearch.Text)
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
.ObserveOn(SynchronizationContext.Current)
.Do(_ =>
{
this.parties.Clear();
this.partyBindingSource.ResetBindings(false);
})
.Select(searchTerm =>
// Here we wrap the synchronous repository into an
// async call. Note it's simply not enough to call
// ToObservable(Scheduler.Default) on the enumerable
// because this can actually still block up to the point that the
// first result is yielded. Doing as we have here,
// we guarantee the UI stays responsive
Observable.Start(() =>
{
long partyCount;
var foundParties = string.IsNullOrEmpty(searchTerm)
? partyRepository.GetAll(out partyCount)
: partyRepository.SearchByNameAndNotes(searchTerm);
return foundParties;
}) // Note you can supply a scheduler, default is Scheduler.Default
.SelectMany(x => x.Buffer(500).ToObservable())
.ObserveOn(SynchronizationContext.Current))
.Switch()
.Subscribe(searchResults =>
{
this.parties.AddRange(searchResults);
this.partyBindingSource.ResetBindings(false);
},
ex => { },
() => { });
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…