Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
3.8k views
in Technique[技术] by (71.8m points)

java - Do you have a test to show differences between the reactor map() and flatMap()?

I am still trying to understand the difference between the reactor map() and flatMap() method. First I took a look at the API, but it isn't really helpful, it confused me even more. Then I googled a lot, but it seems like nobody has an example to make the differences understandable, if there are any differences.

Therefore I tried to write two tests to see the different behaviour for each methods. But unfortunatley it isn't working as I hoped it would...

First test method is testing the reactive flatMap() method:

@Test
void fluxFlatMapTest() {
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .flatMap(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

The output is as expected, explainable and looks like that:

9 - parallel-2
1 - parallel-1
4 - parallel-1
25 - parallel-3
36 - parallel-3
49 - parallel-4
64 - parallel-4
81 - parallel-5
100 - parallel-5
16 - parallel-2

The second method should test the output of the map() method to compare with above results of the flatMap() method.

@Test
void fluxMapTest() {
    final int start = 1;
    final int stop = 100;
    Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .window(2)
            .map(fluxOfInts -> fluxOfInts.map(this::processNumber).subscribeOn(Schedulers.parallel()))
            .doOnNext(System.out::println)
            .subscribe();
}

This test method has the output, I didn't expected at all and looks like that:

FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn
FluxSubscribeOn

There is a little helper method which looks like that:

private String processNumber(Integer x) {
    String squaredValueAsString = String.valueOf(x * x);
    return squaredValueAsString.concat(" - ").concat(Thread.currentThread().getName());
}

Nothing special here.

I am using Spring Boot 2.3.4 with Java 11 and the reactor implementation for Spring.

Do you have a good explaning example or do you know how to change the above tests so that they make sense? Then please help me out with that. Thanks a lot in advance!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Reactor which is the underlying library in Webflux consists of something called the event loop which in turn i believe is based on an architecture called the LMAX Architecture.

This means that the event loop is a single threaded event processer. Everything up to the event loop can be multithreaded but the events themselves are processed by a single thread. The event loop.

Regular spring boot applications are usually run using the server tomcat, or undertow, while webflux is per default run by the event driven server Netty, which in turn uses this event loop to process events for us.

So now that we understand what is underneath everything we can start talking about map and flatMap.

Map

If we look in the api we can see the following image:

map image

and the api text says:

Transform the items emitted by this Flux by applying a synchronous function to each item.

Which is pretty self explanatory. We have a Flux of items, and each time map asks for an item to process it wont ask for another one until it has finished processing the first. Hence synchronous.

The image shows that, green circle needs to be converted to a green square, until we can ask for the the yellow circle to be converted to a yellow square... etc. etc.

here is a code example:

Flux.just("a", "b", "c")
    .map(value -> value.toUppercase())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

Each are run on the main thread, and processed after each other synhronously.

flatMap

If we look in the api we can see the following image:

enter image description here

and the text says:

Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

it does this using basically three steps:

  • Generation of inners and subscription: this operator is eagerly subscribing to its inners.
  • Ordering of the flattened values: this operator does not necessarily preserve original ordering, as inner element are flattened as they arrive.
  • Interleaving: this operator lets values from different inners interleave (similar to merging the inner sequences).

So what does this mean? well it basically means that:

  1. it will take each item in the flux, and transform it to individual Mono (publisher) with one item in each.

  2. Order the items as they get processed, flatMap does NOT preserve order, as items can be processed in a different amount of time on the event loop.

  3. Merging back all the processed items into a Flux for further processing down the line.

Here is a code example:

Flux.just("a", "b", "c")
        .flatMap(value -> Mono.just(value.toUpperCase()))
         .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - main
B - main
C - main

Wait flatMap printing the same thing as map!

Well, it all comes back to the threading model we talked about earlier. Actually there is only one thread called the event loop that handles all events.

Reactor is concurrent agnostic meaning that any worker can schedule jobs to be handled by the event loop.

So what is a worker well a worker is something a scheduler can spawn. And one important thing is that a worker doesn't have to be a thread, it can be, but it doesn't have to be.

In the above code cases, the main thread subscribes to our flux, which means that the main thread will process this for us and schedule work for the event loop to handle.

In a server environment this necessarily doesn't have to be the case. The important thing to understand here is that reactor can switch workers (aka possible threads) whenever it wants if it needs to.

In my above code examples there is only a main thread, so there is no need to run things on multiple threads, or have parallel execution.

If i wish to force it, i can use one of the different schedulers which all have their uses. In Netty, the server will start up will the same amount of event loop threads as cores on your machine, so there it can switch workers and cores freely if needed at heavy loads to maximize the usage of all event loops..

FlatMap being async does NOT mean parallel, it means that it will schedule all things to be processed by the event loop at the same time but its still only one thread executing the tasks.

Parallel execution

if i really want to execute something in parallel you can for instance place something on a parallel Scheduler. This means that it it will guarantee multiple workers on multiple cores. But remember there is a setup time for this when your program is run, and this is usually only beneficial if you have heavy computational stuff which in turn needs a lot of single core CPU power.

code example:

Flux.just("a", "b", "c")
    .flatMap(value -> value -> Mono.just(value.toUpperCase()))
    .subscribeOn(Schedulers.parallel())
    .subscribe(s -> System.out.println(s + " - " + Thread.currentThread().getName()));

// Output
A - parallel-1
B - parallel-1
C - parallel-1

Here we are still running on just one thread, because onSubscribe means that when a thread subscribes the Scheduler will pick one thread from the scheduler pool and then stick with it throughout execution.

if we want to absolutely feel the need to force execution on multiple threads we can for instance use a parallel flux.

Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

// Output
parallel-3 -> 2
parallel-2 -> 1
parallel-3 -> 4
parallel-2 -> 3
parallel-3 -> 6
parallel-2 -> 5
parallel-3 -> 8
parallel-2 -> 7
parallel-3 -> 10
parallel-2 -> 9

But remember this is in most cases not necassary. There is a setup time, and this type of execution is usually only beneficial if you have a lot of cpu heavy tasks. Otherwise using the default event loop single thread will in most cases "probably" be faster.


Most of the information here is fetched from the Flux and Mono api.

the Reactor documentation is an amazing and interesting source of information.

also Simon Baslé's blog series Flight of the flux is also a wonderful and interesting read. It also exists in Youtube format

There is also some faults here and there and i have made some assumptions too especially when it comes to the inner workings of Reactor. But hopefully this will at least clear up some thoughts.

If someone feels things are direct faulty, feel free to edit.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...