Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
217 views
in Technique[技术] by (71.8m points)

java - Can a Collector's combiner function ever be used on sequential streams?

Sample program:

public final class CollectorTest
{
    private CollectorTest()
    {
    }

    private static <T> BinaryOperator<T> nope()
    {
        return (t, u) -> { throw new UnsupportedOperationException("nope"); };
    }

    public static void main(final String... args)
    {
        final Collector<Integer, ?, List<Integer>> c
            = Collector.of(ArrayList::new, List::add, nope());

        IntStream.range(0, 10_000_000).boxed().collect(c);
    }
}

So, to simplify matters here, there is no final transformation, so the resulting code is quite simple.

Now, IntStream.range() produces a sequential stream. I simply box the results into Integers and then my contrived Collector collects them into a List<Integer>. Pretty simple.

And no matter how many times I run this sample program, the UnsupportedOperationException never hits, which means my dummy combiner is never called.

I kind of expected this, but then I have already misunderstood streams enough that I have to ask the question...

Can a Collector's combiner ever be called when the stream is guaranteed to be sequential?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

A careful reading of the streams implementation code in ReduceOps.java reveals that the combine function is called only when a ReduceTask completes, and ReduceTask instances are used only when evaluating a pipeline in parallel. Thus, in the current implementation, the combiner is never called when evaluating a sequential pipeline.

There is nothing in the specification that guarantees this, however. A Collector is an interface that makes requirements on its implementations, and there are no exemptions granted for sequential streams. Personally, I find it difficult to imagine why sequential pipeline evaluation might need to call the combiner, but someone with more imagination than me might find a clever use for it, and implement it. The specification allows for it, and even though today's implementation doesn't do it, you still have to think about it.

This should not surprising. The design center of the streams API is to support parallel execution on an equal footing with sequential execution. Of course, it is possible for a program to observe whether it is being executed sequentially or in parallel. But the design of the API is to support a style of programming that allows either.

If you're writing a collector and you find that it's impossible (or inconvenient, or difficult) to write an associative combiner function, leading you to want to restrict your stream to sequential execution, maybe this means you're heading in the wrong direction. It's time to step back a bit and think about approaching the problem a different way.

A common reduction-style operation that doesn't require an associative combiner function is called fold-left. The main characteristic is that the fold function is applied strictly left-to-right, proceeding one at a time. I'm not aware of a way to parallelize fold-left.

When people try to contort collectors the way we've been talking about, they're usually looking for something like fold-left. The Streams API doesn't have direct API support for this operation, but it's pretty easy to write. For example, suppose you want to reduce a list of strings using this operation: repeat the first string and then append the second. It's pretty easy to demonstrate that this operation isn't associative:

List<String> list = Arrays.asList("a", "b", "c", "d", "e");

System.out.println(list.stream()
    .collect(StringBuilder::new,
             (a, b) -> a.append(a.toString()).append(b),
             (a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE

Run sequentially, this produces the desired output:

aabaabcaabaabcdaabaabcaabaabcde

But when run in parallel, it might produce something like this:

aabaabccdde

Since it "works" sequentially, we could enforce this by calling sequential() and back this up by having the combiner throw an exception. In addition, the supplier must be called exactly once. There's no way to combine the intermediate results, so if the supplier is called twice, we're already in trouble. But since we "know" the supplier is called only once in sequential mode, most people don't worry about this. In fact, I've seen people write "suppliers" that return some existing object instead of creating a new one, in violation of the supplier contract.

In this use of the 3-arg form of collect(), we have two out of the three functions breaking their contracts. Shouldn't this be telling us to do things a different way?

The main work here is being done by the accumulator function. To accomplish a fold-style reduction, we can apply this function in a strict left-to-right order using forEachOrdered(). We have to do a bit of setup and finishing code before and after, but that's no problem:

StringBuilder a = new StringBuilder();
list.parallelStream()
    .forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());

Naturally, this works fine in parallel, though the performance benefits of running in parallel may be somewhat negated by the ordering requirements of forEachOrdered().

In summary, if you find yourself wanting to do a mutable reduction but you're lacking an associative combiner function, leading you to restrict your stream to sequential execution, recast the problem as a fold-left operation and use forEachRemaining() on your accumulator function.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...