Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
149 views
in Technique[技术] by (71.8m points)

system.reactive - Merging historical and live stock price data with Rx

I'm trying out Rx because it seems like a good fit for our domain but the learning curve has taken me by surprise.

I need to knit together historical price data with live price data.

I'm trying to adapt the usual approach to doing this into the language of Rx:

  1. Subscribe to the live prices immediately and start buffering the values I get back
  2. Initiate a request for historical price data (this needs to happen after the subscription to live prices so we don't have any gaps in our data)
  3. Publish historical prices as they come back
  4. Once we've received all historical data, publish the buffered live data, removing any values that overlap with our historical data at the beginning
  5. Continue replaying data from the live price feed

I have this disgusting and incorrect straw man code which seems to work for the naive test cases I've written:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}

This has a few drawbacks

  1. The appropriate replay buffer size is not known. Setting an unlimited buffer isn't possible- this is a long-running sequence. Really we want some kind of one-time buffer that flushes on the first call to Subscribe. If this exists in Rx, I can't find it.
  2. The replay buffer will continue to exist even once we've switched to publishing live prices. We don't need the buffer at this point.
  3. Similarly, the predicate to filter out overlapping ticks isn't necessary once we've skipped the initial overlap between historical and live prices. I really want to do something like: live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */). Is Wait(this IObservable<TSource>) useful here?

There must be a better way to do this, but I'm still waiting for my brain to grok Rx like it does FP.

Another option I've considered to solve 1. is writing my own Rx extension which would be an ISubject that queues messages until it gets its first subscriber (and refuses subscribers after that?). Maybe that's the way to go?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

If your historical and live data are both time-or-scheduler-based, that is, the event stream looks like this over time:

|---------------------------------------------------->  time
    h   h   h   h  h  h                                 historical
                l  l  l  l  l  l                        live

You can use a simple TakeUntil construct:

var historicalStream = <fetch historical data>;
var liveStream = <fetch live data>;

var mergedWithoutOverlap = 
     // pull from historical
     historicalStream
       // until we start overlapping with live
       .TakeUntil(liveStream)
       // then continue with live data
       .Concat(liveStream);

If you get all your historical data all at once, like a IEnumerable<T>, you can use a combination of StartWith and your other logic:

var historicalData = <get IEnumerable of tick data>;
var liveData = <get IObservable of tick data>;

var mergedWithOverlap = 
    // the observable is the "long running" feed
    liveData
    // But we'll inject the historical data in front of it
    .StartWith(historicalData)
    // Perform filtering based on your needs
    .Where( .... );

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...