Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
521 views
in Technique[技术] by (71.8m points)

c# - Does reactive extensions support rolling buffers?

I'm using reactive extensions to collate data into buffers of 100ms:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

This works fine. However, I want slightly different behavior than that provided by the Buffer operation. Essentially, I want to reset the timer if another data item is received. Only when no data has been received for the entire 100ms do I want to handle it. This opens up the possibility of never handling the data, so I should also be able to specify a maximum count. I would imagine something along the lines of:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

I've had a look around and haven't been able to find anything like this in Rx? Can anyone confirm/deny this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

This is possible by combining the built-in Window and Throttle methods of Observable. First, let's solve the simpler problem where we ignore the maximum count condition:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

The powerful Window method did the heavy lifting. Now it's easy enough to see how to add a maximum count:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

I'll write a post explaining this on my blog. https://gist.github.com/2244036

Documentation for the Window method:


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...