ReplaySubject
doesn't offer a means to clear the buffer, but there are several overloads to constrain its buffers in different ways:
- A maximum
TimeSpan
that items are retained for
- A maximum item count
- A combination of the above, which drops items as soon as either condition is met.
A Clearable ReplaySubject
This was quite an interesting problem - I decided to see how easy it would be to implement a variation of ReplaySubject
you can clear - using existing subjects and operators (as these are quite robust). Turns out it was reasonably straightforward.
I've run this through a memory profiler to check it does the right thing. Call Clear()
to flush the buffer, otherwise it works just like a regular unbounded ReplaySubject
:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
Respect usual rules (as with any Subject
) and don't call methods on this class concurrently - including Clear()
. You could add synchronization locks trivially if needed.
It works by nesting a sequence of ReplaySubjects inside a master ReplaySubject. The outer ReplaySubject (_subjects
) holds a buffer of exactly one inner ReplaySubject (_currentSubject
), and it is populated on construction.
The OnXXX
methods call through to the _currentSubject
ReplaySubject.
Observers are subscribed to a concatenated projection of the nested ReplaySubjects (held in _concatenatedSubjects
). Because the buffer size of _subjects
is just 1, new subscribers acquire the events of only the most recent ReplaySubject
onwards.
Whenever we need to "clear the buffer", the existing _currentSubject
is OnCompleted
and a new ReplaySubject is added to _subjects
and becomes the new _currentSubject
.
Enhancements
Following @Brandon's suggestion, I created a version of RollingReplaySubject
that uses either a TimeSpan
or an input stream to signal buffer clearing. I created a Gist for this here: https://gist.github.com/james-world/c46f09f32e2d4f338b07
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…