Java8 CompletableFuture.get(long timeout, TimeUnit unit) doesn't return in time

85 Views Asked by At

I assume all the task will be timed out after 1000ms, but actually it doesn't. It prints like below, how does these happen.

future0
java.util.concurrent.TimeoutException
1010
future1
java.util.concurrent.TimeoutException
2014
future2
3015
future3
3015
future4
3016
future5
3016
future6
3016
future7
java.util.concurrent.TimeoutException
4020
future8
java.util.concurrent.TimeoutException
5021
future9
6018
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        int taskCount = 10;
        long ts = System.currentTimeMillis();
        List<CompletableFuture> list = new ArrayList<>();
        for (int i = 0; i < taskCount; i++) {
            CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            list.add(future);
        }
        for (int i = 0; i < taskCount; i++) {
            System.out.println("future" + i);
            try {
                list.get(i).get(1000, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                System.out.println(e);
            } finally {
                System.out.println(System.currentTimeMillis() - ts);
            }
        }
    }

I assume when I call CompletableFuture.get(long timeout, TimeUnit unit), I can always get the result or a TimeoutException just in time

1

There are 1 best solutions below

1
Slaw On

You are submitting tasks to the common ForkJoinPool. A fork-join pool has a set level of parallelism that it tries to maintain. The common pool's default parallelism is based on the number of available processors. In your case, the parallelism is set to seven. Due to the way your tasks are implemented, however, the calls to sleep prevent the pool from spawning threads to maintain its parallelism. Thus, only seven of your tasks can run concurrently at any given time. In short, not all your tasks are starting at the same time.

Additionally, you are calling get on each future sequentially. While the main thread is blocked on a call to get, the started tasks continue to execute.

You can see all this better if you include more information in your logs.

Parallelism: 7
|---------|------------|----------|-----------|---------|------------|
| Task ID | Task Start | Task End | Get Start | Get End |   Result   |
|---------|------------|----------|-----------|---------|------------|
|       0 |          6 |     3008 |         7 |    1012 | timed out  |
|       1 |          7 |     3008 |      1012 |    2026 | timed out  |
|       2 |          8 |     3008 |      2026 |    3008 | successful |
|       3 |          8 |     3010 |      3008 |    3010 | successful |
|       4 |          8 |     3010 |      3010 |    3010 | successful |
|       5 |          8 |     3026 |      3010 |    3026 | successful |
|       6 |          9 |     3026 |      3026 |    3026 | successful |
|       7 |       3008 |     6009 |      3026 |    4028 | timed out  |
|       8 |       3008 |     6009 |      4028 |    5041 | timed out  |
|       9 |       3008 |     6009 |      5041 |    6009 | successful |

Note: Start and end times are in milliseconds. Values are relative to a constant origin time.

From those times you can see tasks 0-6 all start immediately, but tasks 7-9 do not start until after tasks 0-6 complete, three seconds later. Meanwhile, the get calls happen every second. They also block for a second, except for tasks 3-6, where get returns immediately because the tasks are already complete.


Here is the code that gave the above output.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Main {

  public static void main(String[] args) throws Exception {
    long originNanos = System.nanoTime();

    var handles = new ArrayList<TaskHandle>();
    for (int i = 0; i < 10; i++) {
      var metrics = new TaskMetrics();
      var future = CompletableFuture.runAsync(createTask(metrics));
      handles.add(new TaskHandle(future, metrics));
    }

    for (var handle : handles) {
      try {
        handle.metrics().getCallNanos = System.nanoTime();
        handle.future().get(1_000, TimeUnit.MILLISECONDS);
      } catch (TimeoutException ex) {
        handle.metrics().timedOut = true;
      }
      handle.metrics().getReturnNanos = System.nanoTime();
    }

    for (var handle : handles) {
      // Ensure all tasks are completed before logging. This also
      // creates a happens-before relationship between writing the
      // TaskMetrics fields and reading them.
      handle.future().get();
    }

    logMetrics(handles, originNanos);
  }

  static void logMetrics(List<TaskHandle> handles, long originNanos) {
    System.out.println("Parallelism: " + ForkJoinPool.commonPool().getParallelism());
    System.out.println("|---------|------------|----------|-----------|---------|------------|");
    System.out.println("| Task ID | Task Start | Task End | Get Start | Get End |   Result   |");
    System.out.println("|---------|------------|----------|-----------|---------|------------|");

    var format = "| %7d | %10d | %8d | %9d | %7d | %-10s |%n";
    for (int i = 0; i < handles.size(); i++) {
      var metrics = handles.get(i).metrics();
      long taskStartMs = toMillis(originNanos, metrics.startNanos);
      long taskEndMs = toMillis(originNanos, metrics.endNanos);
      long getStartMs = toMillis(originNanos, metrics.getCallNanos);
      long getEndMs = toMillis(originNanos, metrics.getReturnNanos);
      var result = metrics.timedOut ? "timed out" : "successful";

      System.out.printf(format, i, taskStartMs, taskEndMs, getStartMs, getEndMs, result);
    }
  }

  static long toMillis(long fromNanos, long toNanos) {
    return Math.round((toNanos - fromNanos) / 1_000_000.0);
  }

  static Runnable createTask(TaskMetrics metrics) {
    return () -> {
      metrics.startNanos = System.nanoTime();
      try {
        Thread.sleep(3_000);
        metrics.endNanos = System.nanoTime();
      } catch (InterruptedException ex) {
        throw new CancellationException();
      }
    };
  }

  static class TaskMetrics {
    long startNanos;
    long endNanos;
    long getCallNanos;
    long getReturnNanos;
    boolean timedOut;
  }

  record TaskHandle(Future<?> future, TaskMetrics metrics) {}
}

Note: Code compiled and executed with Java 21.