Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
680 views
in Technique[技术] by (71.8m points)

multithreading - How to (globally) replace the common thread pool backend of Java parallel streams?

I would like to globally replace the common thread pool used by default by the Java parallel streams, i.e., for example for

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});

I know that it is possible to use a dedicated ForkJoinPool by submitting such instruction to a dedicated thread pool (see Custom thread pool in Java 8 parallel stream ). The question here is

  • Is it possible to replace the common ForkJoinPool by some other implementation (say a Executors.newFixedThreadPool(10)?
  • Is it possible to do so by some global setting, e.g., some JVM property?

Remark: The reason why I like to replace the F/J pool is, because it appears to have a bug which makes it unusable for nested parallel loops.

Nested parallel loops have poor performance and may lead to deadlocks, see http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

For example: The following code leads to a deadlock:

// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {

    // (omitted:) do some heavy work here (consuming majority of time)

    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});

(even without any additional code at "do work here", given that parallelism is set to < 12).

My question is how to replace the FJP. If you like to discuss nested parallel loops, you might check Nested Java 8 parallel forEach loop perform poor. Is this behavior expected? .

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I think that's not the way the stream API is intended to be used. It seems you're (mis)using it for simply doing parallel task execution (focusing on the task, not the data), instead of doing parallel stream processing (focusing on the data in the stream). Your code somehow violates some of the main principles for streams. (I'm writing 'somehow' as it is not really forbidden but discouraged): Avoid states and side effects.

Apart from that (or maybe because of side effects), you're using heavy synchronization within your outer loop, which is everything else but harmless!

Although not mentioned in the documentation, parallel streams use the common ForkJoinPool internally. No matter whether or not this is a lack of documentation, we must simply accept that fact. The JavaDoc of ForkJoinTask states:

It is possible to define and use ForkJoinTasks that may block, but doing do requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool.ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool.getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.

Again, it seems that you're using streams as replacement for a simple for-loop and an executor service.

  • If you just want to execute n tasks in parallel, use an ExecutionService
  • If you have a more complex example where tasks are creating subtasks, consider using a ForkJoinPool (with ForkJoinTasks) instead. (It ensures a constant number of threads without the danger of a deadlock because of too many tasks waiting for others to complete, as waiting tasks do not block their executing threads).
  • If you want to process data (in parallel), consider using the stream API.
  • You cannot 'install' a custom common pool. It's created internally in private static code.
  • But you can take influence on the parallelism, the thread factory and the exception handler of the common pool using certain system properties (see JavaDoc of ForkJoinPool)

Don't mix up ExecutionService and ForkJoinPool. They are (usually) not a replacement for each other!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...