Based on akarnokd's answer and an answer in a similar question, an alternative implementation:
A single sentinel value (as per the OP)
If you're looking for a single value to indicate the lapse of time between emissions:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
Continuous sentinel values
If you're looking to receive values continuously after the source observable not emitting for some duration:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
The difference being the timeout
observable and whether it's recurring or not.
You can replace -1
with null
as needed.
All of the above is tested with RxJava 1.0.17 using Java 1.8.0_72
.