CompletableFuture stream created from iterator is not lazily evaluated

866 Views Asked by At

I'm struggling a bit with how and when completable futures are completed. I have created this test case:

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamOfCompletableFuturesTest {
    @Test
    public void testList() {
        completeFirstTwoElements(
                Stream.of("list one", "list two", "list three", "list four", "list five")
        );
    }

    @Test
    public void testIterator() {
        Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

        completeFirstTwoElements(
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
        );
    }

    private void completeFirstTwoElements(Stream<String> stream) {
        stream
                .map(this::cf)
                .limit(2)
                .parallel()
                .forEach(cf -> {
                    try {
                        System.out.println(cf.get());
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                });
    }

    private CompletableFuture<String> cf(String result) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Running " + result);
            return result;
        });
    }
}

And the output is:

Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one

The testList method works as expected. The CompletableFuture's are only evaluated at the very end, so after the limit method has only kept the first two items.

However, the testIterator method is unexpected. All CompletableFuture's are completed and the limiting is only done afterwards.

If I remove the parallel() method from the stream it works as expected. However, the processing (the forEach()) should be done in parallel because in my full program it is a long-running method.

Can any one explain why this is happening?

It looks like this depends on the Java version, so I'm on 1.8:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
2

There are 2 best solutions below

1
On BEST ANSWER

Parallelism applies to the whole pipeline, so you cannot really control what will be executed before the limit() is applied in a parallel Stream. The only guarantee is that what's after the limit() will only be executed on the retained elements.

The difference between the two is probably due to some implementation details or other Stream characteristics. In fact, you can easily invert the behavior by playing on the SIZED characteristic. It seems when the Stream has a known size, only 2 elements are processed.

So for example, applying a simple filter() will lose the size of the list version:

completeFirstTwoElements(
        Stream.of("list one", "list two", "list three", "list four", "list five").filter(a -> true)
);

outputs for example:

Running list one
Running list five
Running list two
Running list three
list one
list two

And not using the unknown size version of Spliterator.spliterator() "fixes" the behavior:

Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();

completeFirstTwoElements(
        StreamSupport.stream(Spliterators.spliterator(iterator, Spliterator.ORDERED, 5), false)
);

Output:

Running iterator two
Running iterator one
iterator one
iterator two
3
On

Your statement “all CompletableFutures are completed” is equivalent to “all CompletableFutures are created”, as once supplyAsync has been executed, the evaluation of the supplier has been scheduled, regardless of whether someone will eventually invoke get or not.

So what you perceive here, is the evaluation of the function passed to map, even if the subsequent processing will not consume the result. This is a valid behavior; the function may get evaluated for more elements than necessary, in an arbitrary order or even concurrently, as long as the Stream will use the right results afterwards, in respect to the limit and encounter order.

Now, whether evaluating more elements than necessary will happen and how many excess elements are processed, is an implementation detail and the implementation has changed, as discussed in “Internal changes for limit and unordered stream”. While that Q&A is about unordered streams, it’s plausible that similar improvements were made for ordered streams.

The takeaway is that you should not assume that the functions are only evaluated for the minimum number of required elements. Doing so would reduce the efficiency of parallel processing. This still applies, even when Java 9 improved the parallel limit operation. A simple change may re‑intro­duce the evaluation of more elements:

private void completeFirstTwoElements(Stream<String> stream) {
    stream.map(this::cf)
          .filter(x -> true)
          .limit(2)
          .parallel()
          .forEach(cf -> System.out.println(cf.join()));
}