Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
235 views
in Technique[技术] by (71.8m points)

javascript - How to build an rx poller that waits some interval AFTER the previous ajax promise resolves?

Been working on a few approaches to this. Basically, I don't want a poller that kicks off an ajax every 30 seconds from the start of polling -- I want a poller that kicks off requests 30 seconds AFTER the previous request returns. Plus, I want to work in some strategy around exponential back-off for failures.

Here's what I have so far (Rx4):

rx.Observable.create(function(observer) {
    var nextPoll = function(obs) {
      // the action function invoked below is what i'm passing in
      // to my poller service as any function which returns a promise
      // and must be invoked each loop due to fromPromise caching
      rx.Observable.fromPromise(action())
        .map(function (x){ return x.data; })
        .subscribe(function(d) {       
            // pass promise up to parent observable  
            observer.onNext(d);

            // reset interval in case previous call was an error
            interval = initInterval;   
            setTimeout(function(){ nextPoll(obs); }, interval);
        }, function(e) {
          // push interval higher (exponential backoff)
          interval = interval < maxInterval ? interval * 2 : maxInterval;
          setTimeout(function(){ nextPoll(obs); }, interval);

        });
    };
    nextPoll(observer);
});

For the most part, this does what I want. I don't like the use of setTimeout, but I can't seem to find a better Observable approach to this (other than a one-off interval/timer with another subscribe).

The other thing that I haven't been able to work into this is the ability to control whether the poller, when initially started, can start with a delay or fire immediately. For some uses, I will have just fetched the data prior to starting to poll, so I can let it wait the interval before firing for the first time. So far, I've only had luck with timer/delay that happens before the first ajax or between the ajax and providing it to subscribers, which doesn't work for me.

Would appreciate any thoughts on cleaning this in, both generally and in terms of getting rid of the setTimeout. And, if anyone has a way to kick off this poller with an optional delay, that would be tremendous! Thanks all!!

UPDATE: Finally got this working the way I envisioned. Here's what that looks like:

function computeInterval(error) {
  if (error) {
    // double until maximum interval on errors
    interval = interval < maxInterval ? interval * 2 : maxInterval;
  } else {
    // anytime the poller succeeds, make sure we've reset to
    // default interval.. this also allows the initInterval to 
    // change while the poller is running
    interval = initInterval;
  }
  return interval;
}

poller$ = rx.Observable.fromPromise(function(){ return _this.action(); })
  .retryWhen(function(errors){
    return errors.scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval(true));
      });
  })
  .repeatWhen(function(notification){
    return notification
      .scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval());
      });
  });
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Just gave it some quick thinking, so it will have to be tested, but hopefully it gets you on a valuable track:

var action; // your action function
Rx.Observable.create(function (observer) {
    function executeAction(action) {
        return Rx.Observable.fromPromise(action()).materialize();
    }

    function computeDelay(){
        // put your exponential delaying logic here
    }

    executeAction()
        .expand(function (x) {
            return Rx.Observable.return({})
                .delay(computeDelay())
                .flatMap(function(){return executeAction(action);})
        })
        .subscribe(function(notification){
             if (notification.kind === "N") {
               observer.onNext(notification.value.data);
             } else if (notification.kind === "E") {
               console.log("error:", notification.error.message);
             }
        });
});

In short, the idea is to use the expand operator for the looping, and the delay operator for the delaying. Have a look at the documentation. Erros are managed using the materialize operator and the notification mechanism (this avoid abruptly terminating your polling stream in case of error returned by the promise).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...