Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
456 views
in Technique[技术] by (71.8m points)

java - ForkJoinPool size increasing dynamically?

Related: CompletableFuture on ParallelStream gets batched and runs slower than sequential stream?

I'm doing some research on different ways of parallelizing network calls through parallelStream and CompletableFutures. As such, I have come across this situation where the ForkJoinPool.commonPool(), which is used by java's parallelStream, is dynamically growing in size, from ~ #Cores, to Max value of 64.

Java details: $ java -version

openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.10+9, mixed mode)

Code that shows such behavior is below (Full executable code here)


    public static int loops = 100;
    private static long sleepTimeMs = 1000;
    private static ExecutorService customPool = Executors.newFixedThreadPool(loops);




    // this method shows dynamic increase in pool size
    public static void m1() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
                .parallel()
                .map(number -> CompletableFuture.supplyAsync(
                        () -> DummyProcess.slowNetworkCall(number), customPool))
                .map(CompletableFuture::join)
                .mapToLong(Long::longValue)
                .summaryStatistics();

    }

    // this method shows static pool size
    public static void m2() {
        Instant start = Instant.now();
        LongSummaryStatistics stats = LongStream.range(0, loops)
                .parallel()
                .map(DummyProcess::slowNetworkCall) // in this call, parallelism/poolsize stays constant 11
                .summaryStatistics();
    }


    public static Long slowNetworkCall(Long i) {
        Instant start = Instant.now();
        // starts with 11 (#cores in my laptop = 12), goes upto 64
        log.info(" {} going to sleep. poolsize: {}", i, ForkJoinPool.commonPool().getPoolSize());
        try {
            TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(" {} woke up..", i);
        return Duration.between(start, Instant.now()).toMillis();
    }

Sample output:

16:07:17.443 [pool-2-thread-7] INFO  generalworks.parallelism.DummyProcess -  44 going to sleep. poolsize: 11
16:07:17.443 [pool-2-thread-9] INFO  generalworks.parallelism.DummyProcess -  7 going to sleep. poolsize: 12
16:07:17.443 [pool-2-thread-4] INFO  generalworks.parallelism.DummyProcess -  6 going to sleep. poolsize: 12
16:07:17.444 [pool-2-thread-13] INFO  generalworks.parallelism.DummyProcess -  82 going to sleep. poolsize: 13
16:07:17.444 [pool-2-thread-14] INFO  generalworks.parallelism.DummyProcess -  26 going to sleep. poolsize: 14
16:07:17.444 [pool-2-thread-15] INFO  generalworks.parallelism.DummyProcess -  96 going to sleep. poolsize: 15
16:07:17.445 [pool-2-thread-16] INFO  generalworks.parallelism.DummyProcess -  78 going to sleep. poolsize: 16
.
.
16:07:18.460 [pool-2-thread-79] INFO  generalworks.parallelism.DummyProcess -  2 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-71] INFO  generalworks.parallelism.DummyProcess -  36 going to sleep. poolsize: 64
16:07:18.460 [pool-2-thread-74] INFO  generalworks.parallelism.DummyProcess -  77 going to sleep. poolsize: 64
16:07:18.461 [pool-2-thread-83] INFO  generalworks.parallelism.DummyProcess -  86 going to sleep. poolsize: 64

I understand that the number of Threads in a commonpool, i.e, it parallelism is based upon max number of available cores, so since my laptop has 12 cores, i get a parallelism of 11 to start with. But I do not understand why it keeps climbing in one method, but in the other one, it's size keeps constants

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I believe your answer is here (ForkJoinPool implementation):

                        if ((wt = q.owner) != null &&
                            ((ts = wt.getState()) == Thread.State.BLOCKED ||
                             ts == Thread.State.WAITING))
                            ++bc;            // worker is blocking

In one version of your code, you block on Thread.sleep, which puts the thread into the TIMED_WAITING state, while in the other you block on CompletableFuture.join(), which puts it into the WAITING state. The implementation distinguishes between these and exhibits the different behaviors you have observed.

There is also special-cased code inside CompletableFuture that makes it cooperate with the ForkJoinPool in order to prevent starvation while waiting for the result:

            if (Thread.currentThread() instanceof ForkJoinWorkerThread)
                ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);

A conclusion relevant to the reason why you're testing this in the first place: Thread.sleep() does not properly simulate a long network call. If you did an actual one, or some other blocking operation, it would compensate by extending the pool.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...