For a source emitting faster than your interval
zip
your source with an interval
of the required time span.
zip(source, interval(500)).pipe(
map(([value, _]) => value) // only emit the source value
)
zip
emits the 1st item from source
with the 1st item from interval
, then the 2nd item from source
with the 2nd item from interval
and so on. If the output observable should only emit when interval
emits, the Nth value from source
has to arrive before the Nth value from interval
.
Potential Problem:
If your source
emits slower than interval
at some point (i.e. the Nth value from source
arrives after the Nth value from interval
) then zip
will emit directly without waiting for the next time interval
emits.
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
zip output: -----1-----2-----3-----4--------5----6-----
? ? ? ? ?? ??
// emits 5 and 6 don't happen when interval emits
For a source emitting at any rate
function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) =>
defer(() => {
let sourceCompleted = false;
const queue = source.pipe(
tap({ complete: () => (sourceCompleted = true) }),
scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
);
return interval(period).pipe(
withLatestFrom(queue), // combine with the latest buffer
takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
);
});
}
source.pipe(
emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
v v
source: -1--------2-3---4---------------5----6-----
interval: -----1-----2-----3-----4-----5-----6-----7-
output: -----1-----2-----3-----4-----------5-----6-
? ? ? ? ? ?
// all output emits happen when interval emits
https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…