A careful reading of the streams implementation code in ReduceOps.java reveals that the combine function is called only when a ReduceTask
completes, and ReduceTask
instances are used only when evaluating a pipeline in parallel. Thus, in the current implementation, the combiner is never called when evaluating a sequential pipeline.
There is nothing in the specification that guarantees this, however. A Collector
is an interface that makes requirements on its implementations, and there are no exemptions granted for sequential streams. Personally, I find it difficult to imagine why sequential pipeline evaluation might need to call the combiner, but someone with more imagination than me might find a clever use for it, and implement it. The specification allows for it, and even though today's implementation doesn't do it, you still have to think about it.
This should not surprising. The design center of the streams API is to support parallel execution on an equal footing with sequential execution. Of course, it is possible for a program to observe whether it is being executed sequentially or in parallel. But the design of the API is to support a style of programming that allows either.
If you're writing a collector and you find that it's impossible (or inconvenient, or difficult) to write an associative combiner function, leading you to want to restrict your stream to sequential execution, maybe this means you're heading in the wrong direction. It's time to step back a bit and think about approaching the problem a different way.
A common reduction-style operation that doesn't require an associative combiner function is called fold-left. The main characteristic is that the fold function is applied strictly left-to-right, proceeding one at a time. I'm not aware of a way to parallelize fold-left.
When people try to contort collectors the way we've been talking about, they're usually looking for something like fold-left. The Streams API doesn't have direct API support for this operation, but it's pretty easy to write. For example, suppose you want to reduce a list of strings using this operation: repeat the first string and then append the second. It's pretty easy to demonstrate that this operation isn't associative:
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
System.out.println(list.stream()
.collect(StringBuilder::new,
(a, b) -> a.append(a.toString()).append(b),
(a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE
Run sequentially, this produces the desired output:
aabaabcaabaabcdaabaabcaabaabcde
But when run in parallel, it might produce something like this:
aabaabccdde
Since it "works" sequentially, we could enforce this by calling sequential()
and back this up by having the combiner throw an exception. In addition, the supplier must be called exactly once. There's no way to combine the intermediate results, so if the supplier is called twice, we're already in trouble. But since we "know" the supplier is called only once in sequential mode, most people don't worry about this. In fact, I've seen people write "suppliers" that return some existing object instead of creating a new one, in violation of the supplier contract.
In this use of the 3-arg form of collect()
, we have two out of the three functions breaking their contracts. Shouldn't this be telling us to do things a different way?
The main work here is being done by the accumulator function. To accomplish a fold-style reduction, we can apply this function in a strict left-to-right order using forEachOrdered()
. We have to do a bit of setup and finishing code before and after, but that's no problem:
StringBuilder a = new StringBuilder();
list.parallelStream()
.forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());
Naturally, this works fine in parallel, though the performance benefits of running in parallel may be somewhat negated by the ordering requirements of forEachOrdered()
.
In summary, if you find yourself wanting to do a mutable reduction but you're lacking an associative combiner function, leading you to restrict your stream to sequential execution, recast the problem as a fold-left operation and use forEachRemaining()
on your accumulator function.