Been working on a few approaches to this. Basically, I don't want a poller that kicks off an ajax every 30 seconds from the start of polling -- I want a poller that kicks off requests 30 seconds AFTER the previous request returns. Plus, I want to work in some strategy around exponential back-off for failures.
Here's what I have so far (Rx4):
rx.Observable.create(function(observer) {
var nextPoll = function(obs) {
// the action function invoked below is what i'm passing in
// to my poller service as any function which returns a promise
// and must be invoked each loop due to fromPromise caching
rx.Observable.fromPromise(action())
.map(function (x){ return x.data; })
.subscribe(function(d) {
// pass promise up to parent observable
observer.onNext(d);
// reset interval in case previous call was an error
interval = initInterval;
setTimeout(function(){ nextPoll(obs); }, interval);
}, function(e) {
// push interval higher (exponential backoff)
interval = interval < maxInterval ? interval * 2 : maxInterval;
setTimeout(function(){ nextPoll(obs); }, interval);
});
};
nextPoll(observer);
});
For the most part, this does what I want. I don't like the use of setTimeout, but I can't seem to find a better Observable approach to this (other than a one-off interval/timer with another subscribe).
The other thing that I haven't been able to work into this is the ability to control whether the poller, when initially started, can start with a delay or fire immediately. For some uses, I will have just fetched the data prior to starting to poll, so I can let it wait the interval before firing for the first time. So far, I've only had luck with timer/delay that happens before the first ajax or between the ajax and providing it to subscribers, which doesn't work for me.
Would appreciate any thoughts on cleaning this in, both generally and in terms of getting rid of the setTimeout. And, if anyone has a way to kick off this poller with an optional delay, that would be tremendous! Thanks all!!
UPDATE: Finally got this working the way I envisioned. Here's what that looks like:
function computeInterval(error) {
if (error) {
// double until maximum interval on errors
interval = interval < maxInterval ? interval * 2 : maxInterval;
} else {
// anytime the poller succeeds, make sure we've reset to
// default interval.. this also allows the initInterval to
// change while the poller is running
interval = initInterval;
}
return interval;
}
poller$ = rx.Observable.fromPromise(function(){ return _this.action(); })
.retryWhen(function(errors){
return errors.scan(function(acc, x) { return acc + x; }, 0)
.flatMap(function(x){
return rx.Observable.timer(computeInterval(true));
});
})
.repeatWhen(function(notification){
return notification
.scan(function(acc, x) { return acc + x; }, 0)
.flatMap(function(x){
return rx.Observable.timer(computeInterval());
});
});
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…