Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
177 views
in Technique[技术] by (71.8m points)

c# - How can I clear the buffer on a ReplaySubject?

How can I clear the buffer on a ReplaySubject?

Periodically I need to clear the buffer (as an end of day event in my case) to prevent the ReplaySubject continually growing and eventually eating all the memory.

Ideally I want to keep the same ReplaySubject as the client subscriptions are still good.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

ReplaySubject doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:

  • A maximum TimeSpan that items are retained for
  • A maximum item count
  • A combination of the above, which drops items as soon as either condition is met.

A Clearable ReplaySubject

This was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.

I've run this through a memory profiler to check it does the right thing. Call Clear() to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject:

public class RollingReplaySubject<T> : ISubject<T>
{
    private readonly ReplaySubject<IObservable<T>> _subjects;
    private readonly IObservable<T> _concatenatedSubjects;
    private ISubject<T> _currentSubject;

    public RollingReplaySubject()
    {
        _subjects = new ReplaySubject<IObservable<T>>(1);
        _concatenatedSubjects = _subjects.Concat();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void Clear()
    {
        _currentSubject.OnCompleted();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void OnNext(T value)
    {
        _currentSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _currentSubject.OnError(error);
    }

    public void OnCompleted()
    {
        _currentSubject.OnCompleted();
        _subjects.OnCompleted();     
        // a quick way to make the current ReplaySubject unreachable
        // except to in-flight observers, and not hold up collection
        _currentSubject = new Subject<T>();       
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _concatenatedSubjects.Subscribe(observer);
    }
}

Respect usual rules (as with any Subject) and don't call methods on this class concurrently - including Clear(). You could add synchronization locks trivially if needed.

It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects) holds a buffer of exactly one inner ReplaySubject (_currentSubject), and it is populated on construction.

The OnXXX methods call through to the _currentSubject ReplaySubject.

Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects). Because the buffer size of _subjects is just 1, new subscribers acquire the events of only the most recent ReplaySubject onwards.

Whenever we need to "clear the buffer", the existing _currentSubject is OnCompleted and a new ReplaySubject is added to _subjects and becomes the new _currentSubject.

Enhancements

Following @Brandon's suggestion, I created a version of RollingReplaySubject that uses either a TimeSpan or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...