Using Reactive Extensions, I want to ignore messages coming from my event stream that occur while my Subscribe
method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process.
However, when my Subscribe
method completes, if any messages did come through I want to process the last one. So I always process the most recent message.
So, if I have some code which does:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.
Here's an example of the result I want using a background task and Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).
I can also get the result I want by using a BroadcastBlock
from TPL Dataflow, like this:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
but this feels like it should be possible directly in Rx. What's the best way to go about doing it?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…