Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
530 views
in Technique[技术] by (71.8m points)

rx java - Functional-reactive operator in RxJava2 that produces new state from previous state and additional inputs

I am working on a little showcase that is supposed to demonstrate how you can write interactive programs in a purely functional manner using functional-reactive programming (specifically RxJava2).

My goal is to create a simple interactive game. The main function of the game (let's call it Next Game State, or NGS for short) takes the current state of the game, a user input, as well as a random number and computes the next state of the game from those three inputs. Fairly straightforward so far. The user inputs and the random numbers are Flowables that were created from Iterables or via generators. I envisioned that the game state itself would be a Flowable as well (but I might be wrong about that).

I am struggling to find the right functional-reactive operator that applies the function to the three inputs and produces the next state of the game. Initially, I thought that Flowable.zip(source1, source2, source3, zipper) would be right operation: it could take the three flows and combine it via the NGS function into a new Flowable. That, unfortunately, does not account for the fact that the resulting Flowable itself needs to be one of the inputs of the zip operation, which seems an impossible setup. My next idea was to use Flowable.generate, but I need the two additional inputs from other Flowables to calculate the next state, and there is no way to feed those into the generate operator.

In a nutshell, I'm trying to realize something similar to this (pseudo-)marble diagram:

  Game --------------(0)-------------(1)-----------------(2)-----------------(3)---
  State                   _______   /        _______   /        _______   /
                        \_|       |_/   \_____|       |_/   \_____|       |_/
                          | Next  |           | Next  |           | Next  |
                     _____| Game  |      _____| Game  |      _____| Game  |
                    /   __| State |     /   __| State |     /   __| State |
                   /   /  '_______'    /   /  '_______'    /   /  '_______'
                  /   /               /   /               /   /
  User           /   /               /   /               /   /
  Input --------o---/---------------o---/---------------o---/--------------------
                   /                   /                   /
  Random          /                   /                   /
  Number --------o-------------------o-------------------o---------------------------

The top line, I admit, is somewhat non-standard, but it's my best attempt to visualize that, beginning with an initial game state (0), each successive game state (1), (2), (3), etc., is created from the previous game state plus the additional inputs.

I realize that there is probably a relatively straightforward way to do this with an Emitter or maybe inside a downstream subscriber, but one of the goals of this exercise is to show that this can be solved entirely without side effects or void methods.

So, long story short, is there an existing operator that supports this behavior? If not, what would be involved in creating one? Or is there maybe some alternative solution that uses a slightly different overall setup?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Assumption

The user-input and randome number are two separate streams, which could emit at any time by their own.

Solution

long story short, is there an existing operator that supports this behavior?

Yes, there is. You could use #scan(seed<T>, { current<T>, upstream<Q> -> newValue<T> }.

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.functions.BiFunction
import io.reactivex.rxjava3.processors.PublishProcessor
import org.junit.jupiter.api.Test

class So65589721 {
    @Test
    fun `65589721`() {
        val userInput = PublishProcessor.create<String>()
        val randomNumber = PublishProcessor.create<Int>()

        val scan = Flowable.combineLatest(userInput, randomNumber, BiFunction<String, Int, Pair<String, Int>> { u, r ->
            u to r
        }).scan<GameState>(GameState.State1, { currentState, currentInputTuple ->
            // calculate new state from `currentState` and combined pair
            GameState.State3(currentInputTuple.first, currentInputTuple.second)
        })

        val test = scan.test()

        randomNumber.offer(42)

        userInput.offer("input1")
        userInput.offer("input2")

        test
            .assertValues(GameState.State1, GameState.State3("input1", 42), GameState.State3("input2", 42))
    }

    interface GameState {
        object State1 : GameState
        data class State3(val value: String, val random: Int) : GameState
    }
}

This examples shows how to use scan to calculate a new state from a given input pair and a current state. With scan you can "hold" state in a safe manner, without expsosing it to side-effects.

Note

  • the seed-value passed to scan will be emitted as first value on subscription.

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...