I'm capturing events from an application using Rx.Observable.fromEvent
in a NodeJS.
These are sent to another server using request (https://www.npmjs.com/package/request).
To avoid a high network load I need to buffer those events at a given timeout between sent requests.
Problem
Using bufferWithTime(200)
will keep the node process running and I can't know when the application has finished to close the stream.
Is there any way to use Rx buffers to say:
- When Element 1 is pushed set a timer
- When Element 2 and 3 arrive before the timer expires push them to an array [1, 2, 3] (the buffer)
- When the timer expires, send the [1, 2, 3] array down the pipe.
- If Element 4 came after the timer expires then set a new timer and start all over again.
If no element is pushed then no timer is started which would make the process exit.
My initial approach was:
Rx.Observable
.fromEvent(eventEmitter, 'log')
.bufferWithTime(200) // this is the issue
.map(addEventsToRequestOption)
.map(request)
.flatMap(Promise.resolve)
.subscribe(log('Response received'))
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…