I think your answer works. It's also pretty clear. You do, however, have to be sure your $source
is multicasted.
There's one downside I see to your approach:
You do a lot of extra computation. If you're debouncing 1000s of values per second, it might noticeably slow down depending on where it's being run.
Each streamed value can be in any number of races. Inputs from different priorities still race each other and when the next value starts its race, the previous race isn't stopped, so you can have an explosion of timers/races if a lot of values arrive at once.
It's a lot of extra timers be set and dropped. In your situation, you should need a max of three timers, each of which gets reset as a new value of the same priority arrives.
If your code isn't on the critical path that might not be a problem. Otherwise, there are other ways. The one I thought up, though, is a bit bulkier in terms of code.
Partition your streams
Here's how my brain solved this problem. I created an operator that does what RxJS partition
operator does but lets you partition into more than two streams.
My approach handles multicasting internally, so the source can be whatever (hot, cold, multicasted, or not). It (internally) sets up one subject per stream and then you can use RxJS's debounceTime as usual.
There's a downside though. In your approach, you can add a new priority string willy-nilly and it should continue to work. Objects of {priority: "DucksSayQuack"} will debounce each other and not effect other priorities. This can even be done on the fly.
The partitionOn
operator below needs to know the partitions ahead of time. For your described case it should have the same output and be a bit more efficient to boot.
Is this better? I dunno, it's a fun and different approach to solve the same problem. Also, I suppose there are more uses for the partitionOn
operator than a partitioned debounce.
The Operator
/***
* Create a partitioned stream for each value where a passed
* predicate returns true
***/
function partitionOn<T>(
input$: Observable<T>,
predicates: ((v:T) => boolean)[]
): Observable<T>[] {
const partitions = predicates.map(predicate => ({
predicate,
stream: new Subject<T>()
}));
input$.subscribe({
next: (v:T) => partitions.forEach(prt => {
if(prt.predicate(v)){
prt.stream.next(v);
}
}),
complete: () => partitions.forEach(prt => prt.stream.complete()),
error: err => partitions.forEach(prt => prt.stream.error(err))
});
return partitions.map(prt => prt.stream.asObservable());
}
Using partitionOn for priority debounce
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;
const priorityEquals = a => b => a === b?.priority;
merge(
...partitionOn(
$source,
[priorityEquals('low'),
priorityEquals('medium'),
priorityEquals('high')]
).map(s => s.pipe(
debounceTime(1000)
))
);
Timestamp your stream
This approach is very similar to yours and lets you use your priority strings willy-nilly again. This has a similar issue where every value is thrown into a timer and timers aren't canceled as new values arrive.
With this approach, however, the path to canceling unnecessary timers is much more clear. You can store subscription objects alongside timestamps in the priorityTimeStamp
map, and be sure to unsubscribe as new values arrive.
I really have no clue what the performance hit for this might be, I think JavaScript's event loop is pretty robust/efficient. The nice thing with this approach is that you don't pay the cost of multicasting. This is all just effectively one stream using a lookup-map to decide what gets filtered and what doesn't.
The priorityDebounceTime Operator
function priorityDebounceTime<T>(
dbTime: number,
priorityStr = "priority"
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const priorityTimeStamp = new Map<string, number>();
return s.pipe(
mergeMap(v => {
priorityTimeStamp.set(v[priorityStr], Date.now());
return timer(dbTime).pipe(
timestamp(),
filter(({timestamp}) =>
timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
),
mapTo(v)
)
})
)
});
}
Using priorityDebounceTime for priority debounce
This is obviously a bit simpler:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;
$source.pipe(
priorityDebounceTime(delay)
).subscribe(console.log);