So I think I have a cleaner, fully Rx solution. This was actually a pretty fun problem to solve. Provided it works for your needs, I think it ended up being really elegant, although it took quite awhile to arrive at it.
Sadly I don't know Scala, so you're going to have to deal with my Java8 lambdas. :D
The entire implementation:
public static Observable<String> getTurnstile(final Observable<String> queue, final Observable<Boolean> tokens) {
return queue.publish(sharedQueue ->
tokens.switchMap(token -> token ? sharedQueue.limit(1) : Observable.empty()));
}
So, what's happening here is we use publish
to make a shared observable of the people queue that we can subscribe to multiple times. Inside that, we use a switchMap
on our token stream, which means any time a new Observable is emitted from the switchMap, it drops the last one and subscribes to the new one. Any time a token is true, it makes a new subscription to the people queue (and multiple trues in a row is fine, because it's canceling the old subscriptions). When it's false, it just dumps out an empty Observable to not waste time.
And some (passing) testcases:
@RunWith(JUnit4.class)
public class TurnstileTest {
private final TestScheduler scheduler = new TestScheduler();
private final TestSubscriber<String> output = new TestSubscriber<>();
private final TestSubject<Boolean> tokens = TestSubject.create(scheduler);
private final TestSubject<String> queue = TestSubject.create(scheduler);
@Before
public void setup() {
Turnstile.getTurnstile(queue, tokens).subscribe(output);
}
@Test
public void allowsOneWithTokenBefore() {
tokens.onNext(true, 0);
queue.onNext("Bill", 1);
queue.onNext("Bob", 2);
assertPassedThrough("Bill");
}
@Test
public void tokenBeforeIsCancelable() {
tokens.onNext(true, 0);
tokens.onNext(false, 1);
queue.onNext("Bill", 2);
assertNonePassed();
}
@Test
public void tokensBeforeAreCancelable() {
tokens.onNext(true, 0);
tokens.onNext(true, 1);
tokens.onNext(true, 2);
tokens.onNext(false, 3);
queue.onNext("Bill", 4);
assertNonePassed();
}
@Test
public void eventualPassThroughAfterFalseTokens() {
tokens.onNext(false, 0);
queue.onNext("Bill", 1);
tokens.onNext(false, 2);
tokens.onNext(false, 3);
queue.onNext("Jane", 4);
queue.onNext("Bob", 5);
tokens.onNext(true, 6);
tokens.onNext(true, 7);
tokens.onNext(false, 8);
tokens.onNext(false, 9);
queue.onNext("Phil", 10);
tokens.onNext(false, 11);
tokens.onNext(false, 12);
tokens.onNext(true, 13);
assertPassedThrough("Bill", "Jane", "Bob");
}
@Test
public void allowsOneWithTokenAfter() {
queue.onNext("Bill", 0);
tokens.onNext(true, 1);
queue.onNext("Bob", 2);
assertPassedThrough("Bill");
}
@Test
public void multipleTokenEntriesBeforeOnlyAllowsOneAtATime() {
tokens.onNext(true, 0);
tokens.onNext(true, 1);
tokens.onNext(true, 2);
queue.onNext("Bill", 3);
tokens.onNext(true, 4);
tokens.onNext(true, 5);
queue.onNext("Jane", 6);
queue.onNext("John", 7);
assertPassedThrough("Bill", "Jane");
}
@Test
public void noneShallPassWithoutToken() {
queue.onNext("Jane", 0);
queue.onNext("John", 1);
assertNonePassed();
}
private void closeSubjects() {
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
scheduler.triggerActions();
tokens.onCompleted();
queue.onCompleted();
scheduler.triggerActions();
}
private void assertNonePassed() {
closeSubjects();
output.assertReceivedOnNext(Lists.newArrayList());
}
private void assertPassedThrough(final String... names) {
closeSubjects();
output.assertReceivedOnNext(Lists.newArrayList(names));
}
}
Let me know if you find any edge cases that don't work with this, particularly if it has trouble in real time since the tests are obviously in a controlled environment.