Using CompletableFuture in a Loop with Two Futures to Merge per Loop Iteration

1.2k Views Asked by At

I have code like the following:

testMethod(List<String> ids)  {
    List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
    
    for(String id : ids) {
        CompletableFuture<ResultOne> resultOne = AynchOne(id);
        CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
        
    CompletableFuture<ResultThree> resultThree =  resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)); 
    
    resultThreeList.add(resultThree);
    }
    // PROCESS RESULTS HERE
}

class ResultOne {
    boolean goodResult;
    String id;

    ResultOne(String promId) {
        this.goodResult = true;
        this.id = promId;
    }
}

class ResultTwo {
    boolean goodResult;
    String id;

    ResultTwo(String promId) {
        this.goodResult = true;
        this.id = promId;
    }

class ResultThree() {
        boolean goodResult;
        String = id;
    }

private ResultThree computeCombinedResultThree(ResultOne r1,  ResultTwo r2) { 
   ResultThree resultThree = new ResultThree();
    resultThree.id = r1.id;
    resultThree.goodResult = r1.goodResult && r2.goodResult;

    return resultThree;
}

, I need to be able to AND the results resultOne and resultTwo together, such that for each iteration, on the completion of the entire synchronous execution, I have an (I guess) array or map that I can subsequently process, where one object in the array has the corresponding id and a true or false for that id (that represents the AND-ing of the two booleans from the separate objects.

Based on feedback from readers, I have gotten the code completed to the point where I can merge the two original futures, and combine all the results from each iteration to get the entire loop of futures. At this point I just need to process the results.

I think maybe I need another CompletableFuture? This one would maybe be something like this (put above where I have "// PROCESS RESULTS HERE"):

CompletableFuture<Void> future = resultThreeList
  .thenRun(() -> forwardSuccesses(resultThreeList));

future.get();

forwardSuccesses() would iterate through resultThreeList forwarding the successful ids to another process, but not sue that is how to do it. Grateful for any ideas. Thanks.

3

There are 3 best solutions below

1
On BEST ANSWER

So this is how far you got until now:

List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
    CompletableFuture<ResultOne> resultOne = aynchOne(id);
    CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);

    CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
    resultThreeList.add(resultThree);
}

Now all you need to do is convert this List<CompletableFuture<ResultThree>> to a CompletableFuture<List<ResultThree>> that will get completed once all the results are finished calculating.

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
                .thenApply(v -> resultThreeList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

Or with something like

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));

where safeGet is a method that just calls future.get() and catches the exceptions that may occur - you can't just call get() in a lambda because of those exceptions.

Now you can process this list with thenAccept():

try {
    combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

Again, the exceptions being caught are due to the call to get().

Side note, I don't really see why there are three result classes since all you need - for this part of the code at least - is the id and the result status. I'd introduce an interface (Result?) for that and only work on that.

2
On

Within the for loop you get immediately the CompletableFutures in return. In the background some magic happens and you want to wait until both are complete.

So after both CompletableFutures were returned, cause a blocking wait by invoking CompletableFuture.get with a long value and a time unit. If you only invoke get without any parameters you'll wait forever.

Choose your timeout and JDK wisely. It might happen, that JDK 8 doesn't provide a get with timeout. Also JDK 8 isn't supported anymore. JDK 11 is now long term support and recent compilers don't offer JDK 8 as a target anymore.

I really urge you to read the dirty details about CompletableFuture and how it differs from Future, esp. regarding thread control like cancellation. Not knowing the underlying provider of CompletableFuture I also assume querying one ID is waste of ressources and throughput is very limited. But this is a separate question.

2
On

It seems to me you don't need 3 different ResultOne ResultTwo ResultThree classes as they define the same type, so I shall replace them for Result.

Assuming you want to forward only successes, I added a short isGoodResult() method to the Result class to be used as predicate with the streams:

class Result {
    public boolean goodResult;
    public String id;
// ...
    public boolean isGoodResult() {
        return this.goodResult;
    }
}

I'd also recommend getting rid of the loop, replacing it for a stream to make your code more fluid.

Should forwardSuccess be strict, accepting List<Result>, this is how I'd implement testMethod:

void testMethod(List<String> ids)  {
    final List<Result> results = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .map(CompletableFuture::join)
        .filter(Result::isGoodResult)
        .collect(Collectors.toList());

    // PROCESS RESULTS HERE
    forwardSuccesses(results);
}

Should forwardSuccess be lazy, accepting CompletableFuture<List<Result>>:

void testMethod(List<String> ids)  {
    final List<CompletableFuture<Result>> futures = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .collect(Collectors.toList());

    final CompletableFuture<List<Result>> asyncResults =
    CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
            .thenApply(__ -> futures 
                    .stream()
                    .map(CompletableFuture::join)
                    .filter(Result::isGoodResult)
                    .collect(Collectors.toList()));

    // PROCESS RESULTS HERE
    forwardSuccessesAsync(asyncResults);
}