Migrating async task executor to use chained CompletableFuture

472 Views Asked by At

I have an ansynchronous task processor that I am looking to migrate to use the new CompletableFuture syntax as that initially seems a more natural way to manage my task dependencies, none of the guides around CompletableFuture that I've found cover my case, so I am looking for some advice as to the best approach:

  • This is a command line app that a user supplies a list of tasks to then waits for them to be completed.
  • The task list is in the order of 50 tasks that do various data-imports against a database.
  • The tasks all return an import report containing a summary report of times, data processed etc... that gets output to the user.
  • While some of the tasks have no dependencies, most of them will depend on one or more other tasks completing (where more is an arbitrary number, most likely to be 2-3, but sometimes as high as 10-15).
  • If a task depends on other tasks, it always requires all of those tasks to have completed (so CompletableFuture.allOf).
  • If a task depends on other tasks it never depends on the import report that is returned, just on the database state, so doesn't need to consume the result of the previous Future.
  • Some of the tasks are quite long running
  • As the user is waiting, I want to be outputting some status every so often to let them know it is still running.
  • Tasks can fail with exceptions for a variety of reasons, no exception = success.
  • If a task fails, I need to stop dependent tasks executing
  • If a task fails, I'd like to stop any new tasks executing and ideally let the current tasks run to completion, then shutdown.

The current implementation uses a ThreadPoolExecutor and ExecutorCompletionService:

  1. A dependency tree for all tasks is built
  2. All tasks with no dependencies are submitted
  3. The flow control code loops around an AtomicInteger of number of tasks in progress calling CompletionService.poll(timeout)
  4. If it gets a completed task it calls Future.get() to collect the report then runs some dependency management code to add any new tasks that are now unblocked to the executor. (and manages the number of tasks in progress appropriately).
  5. If a number of polls goes by with nothing complete it outputs a message to the user. (and there is some timeout and give up code, but basically the same)
  6. In the case of an exception in future.get() it calls Executor.shutdownNow() to clear the queue and stop any further tasks, outputs results that it has an as helpful an error message as it can make.

Once the number of tasks in progress hits 0 the main control loop finishes, the code performs a tidy shutdown and outputs a report for the user about what their tasks did.

The downside is there is a massive amount of boilerplate around this, managing the polling, exception handling and submitting relevant dependencies and the dependency manager and executor are very tightly coupled.

On the surface CompletableFuture seemed a very useful fit. Having built the dependency tree and ordered it, I can iterate over the tree and use CompletableFuture.supplyAsync() for tasks with no dependencies and when I hit a task with dependencies use CompletableFuture.allOf()

CompletableFuture<Report>[] dependencies = getBlockingTasks(task);
CompletableFuture<Void> allOf = CompletableFuture.allOf(dependencies);
CompletableFuture<Report> future = allOf.thenApplyAsync(v -> executeTask(task)), threadPoolExecutor);

I can also chain using thenApply and thenAccept to each worker thread process the Report and log it.

The main area I am not sure of is the exception handling and polling / waiting. If I didn't want to give updates when things are taking a while I would use:

CompletableFuture<Void> allOf = CompletableFuture.allOf(allMyTasks);
allOf.join();

I am not sure about the best way to manage exception handling at all.

So (other than the snippets I have posted being sensible), what is the best way for me to be able to keep track on the completion state of my tasks so I can continue to output things to the user if things are taking a while, what is the best way to manage exceptions, and perhaps importantly, is this actually a sensible pattern, or am I being drawn in by the "new and shiny and does some things I want well"?

0

There are 0 best solutions below