I'm trying out Rx because it seems like a good fit for our domain but the learning curve has taken me by surprise.
I need to knit together historical price data with live price data.
I'm trying to adapt the usual approach to doing this into the language of Rx:
- Subscribe to the live prices immediately and start buffering the values I get back
- Initiate a request for historical price data (this needs to happen after the subscription to live prices so we don't have any gaps in our data)
- Publish historical prices as they come back
- Once we've received all historical data, publish the buffered live data, removing any values that overlap with our historical data at the beginning
- Continue replaying data from the live price feed
I have this disgusting and incorrect straw man code which seems to work for the naive test cases I've written:
IConnectableObservable<Tick> live = liveService
.For(symbol)
.Replay(/* Some appropriate buffer size */);
live.Connect();
IObservable<Tick> historical = historyService.For(since, symbol);
return new[] {historical, live}
.Concat()
.Where(TicksAreInChronologicalOrder());
private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
// Some stateful predicate comparing the timestamp of this tick
// to the timestamp of the last tick we saw
}
This has a few drawbacks
- The appropriate replay buffer size is not known. Setting an unlimited buffer isn't possible- this is a long-running sequence. Really we want some kind of one-time buffer that flushes on the first call to Subscribe. If this exists in Rx, I can't find it.
- The replay buffer will continue to exist even once we've switched to publishing live prices. We don't need the buffer at this point.
- Similarly, the predicate to filter out overlapping ticks isn't necessary once we've skipped the initial overlap between historical and live prices. I really want to do something like:
live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */)
. Is Wait(this IObservable<TSource>)
useful here?
There must be a better way to do this, but I'm still waiting for my brain to grok Rx like it does FP.
Another option I've considered to solve 1. is writing my own Rx extension which would be an ISubject
that queues messages until it gets its first subscriber (and refuses subscribers after that?). Maybe that's the way to go?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…