Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
166 views
in Technique[技术] by (71.8m points)

c# - Advanceable historical stream and live stream in Rx

I have a hot observable that I normally implement using a normal Subject underneath, so that those interested could subscribe to a live a stream of notifications.

Now I would like to keep that live stream, but also expose a historical stream of all the events that have been AND have absolute times attached to those notifications to know when exactly they happened AS WELL AS allow the subscribers to advance the historical stream to any point in time before replaying the chronology.

  • I believe most of this could be achieved with a HistoricalScheduler and its AdvanceTo method, but I'm not sure exactly how?
  • And is use of Timestamped to save the times of the events needed?
  • And is a ReplaySubject needed to cache the live stream into historical records which could then be played back using the HistoricalScheduler?

How exactly can those two streams be implemented for the same source, or in other words, how can the below be appropriated to the current requirements?

how to save time in .net

[ see "Replaying the past" heading ]

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

What the HistoricalScheduler gives you is the ability to control the forward motion of the virtual time of the scheduler.

What you do not get is random access over time. As virtual time is advanced, scheduled actions are executed, so they must be scheduled in advance. Any action scheduled in the past - i.e. at an absolute time that is behind the HistoricalScheduler.Now value - is executed immediately.

To replay events, you need to record them somehow, then schedule them using an instance of a HistoricalScheduler - and then advance time.

When you advance time, scheduled actions are executed at their due times - and when observables send OnXXX() to their subscribers, the Now property of the scheduler will have the current virtual time.

Each subscriber will need access to it's own scheduler in order to control time independently of other subscribers. This effectively means creating an observable per subscriber.

Here is a quick example I knocked up (that would run in LINQPad if you referenced nuget package rx-main).

First I record a live stream (in a totally non-production way!) recording events into a list. As you suggest, use of TimeStamp() works well to capture timing:

/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();


Console.WriteLine("Time now is " + DateTime.Now);

Now we can use the HistoricalScheduler combined with cunning use of Generate to schedule events. Note that this approach prevents a ton of scheduled events being queued up in advance - instead we are just scheduling one at a time:

var scheduler = new HistoricalScheduler();

/* set up the scheduling of the recording events */
var replay = Observable.Generate(
    log.GetEnumerator(),
    events => events.MoveNext(),
    events => events,
    events => events.Current.Value,
    events => events.Current.Timestamp,
    scheduler);

Now when we subscribe, you can see that the HistoricalScheduler's Now property has the virtual time of the event:

replay.Subscribe(
    i => Console.WriteLine("Event: {0} happened at {1}", i,
    scheduler.Now)); 

Finally we can start the schedule (using Start() just tries to play all events, as opposed to using AdvanceTo to move to a specific time - it's like doing AdvanceTo(DateTime.MaxValue);

scheduler.Start();

The output for me was:

Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00

The upshot is that you'll probably end up having to create your own API over this tool to get something to suit your particular purposes. It leaves you a fair bit of work - but is nonetheless pretty powerful stuff.

What's nice is that the live observable and the replayed observable really look no different from each other - provided you remember to always parameterise your scheduler (!) - and so can have the same queries easily run over them, with temporal queries all working with the virtual time of the scheduler.

I've used this to test out new queries over old data to great effect in commercial scenarios.

What it isn't trying to be is a transport control, such as to serve scrolling back and forth through time in a GUI. Typically you run the history in big chunks, storing the output of new queries, and then use this data for subsequent display in a GUI so users can move back and forth at leisure via some other mechanism you provide.

Finally, you don't need ReplaySubject to cache the live stream; but you do need some means of recording events for replay - this could just be an observer that writes to a log.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...