Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
461 views
in Technique[技术] by (71.8m points)

javascript - How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?

I'm capturing events from an application using Rx.Observable.fromEvent in a NodeJS. These are sent to another server using request (https://www.npmjs.com/package/request). To avoid a high network load I need to buffer those events at a given timeout between sent requests.

Problem

Using bufferWithTime(200) will keep the node process running and I can't know when the application has finished to close the stream.

Is there any way to use Rx buffers to say:

  1. When Element 1 is pushed set a timer
  2. When Element 2 and 3 arrive before the timer expires push them to an array [1, 2, 3] (the buffer)
  3. When the timer expires, send the [1, 2, 3] array down the pipe.
  4. If Element 4 came after the timer expires then set a new timer and start all over again.

If no element is pushed then no timer is started which would make the process exit.

My initial approach was:

Rx.Observable
     .fromEvent(eventEmitter, 'log')
     .bufferWithTime(200) // this is the issue
     .map(addEventsToRequestOption)
     .map(request)
     .flatMap(Promise.resolve)
     .subscribe(log('Response received'))
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

A proposed implementation, using the delay operator :

function emits(who){
  return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");};
}

var source = Rx.Observable.fromEvent(document.body, 'click');
console.log("running");

var delayedSource$ = source.delay(1200);

var buffered$ = source
     .buffer(function () { return  delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;})

buffered$.subscribe(emits("buffer"));

jsbin here : http://jsbin.com/wilurivehu/edit?html,js,console,output


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...