Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
494 views
in Technique[技术] by (71.8m points)

rx java - Dynamic delay value with repeatWhen()

Right now I'm implementing some polling logic with RxJava. I'm supposed to poll an endpoint a number of times until it tells me to stop. Additionally, each response comes back with a time that I'm supposed to delay by before polling again. My logic looks something like this right now:

service.pollEndpoint()
    .repeatWhen(observable -> observable.delay(5000, TimeUnit.MILLISECONDS))
    .takeUntil(Blah::shouldStopPolling);

Right now I have the delay value hardcoded to 5000, but I'd like it to depend on a value in the poll response. I tried using a flatmap that returned Observable.just(pollResponse).repeatWhen(observable -> observable.delay(pollResponse.getDelay(), TimeUnit.MILLISECONDS)), but that didn't seem like the right idea since it messed with the source Observable. I feel like it's something simple that I'm overlooking. Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

As @JohnWowUs mentioned, you need out-of-band communication, but if you subscribe to the sequence more than once, you can use defer to have per-subscriber state:

Observable.defer(() -> {
    int[] pollDelay = { 0 };
    return service.pollEndpoint()
    .doOnNext(response -> pollDelay[0] = response.getDelay())
    .repeatWhen(o -> o.flatMap(v -> Observable.timer(pollDelay[0], MILLISECONDS)))
    .takeUntil(Blah::shouldStopPolling);
});

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...