I am using an Executor service to execute set of parallel tasks and I need one done something after all the tasks get completed. But in my below implementation it is not happening as expected and it is not waiting till all done. Please refer to the code snippet below.
public void doParallelStuff() throws InterruptedException {
final int THREAD_COUNT = 5;
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
Set<String> myIdSet = Set.of("123", "234", "345", "897", "893"); // actual set is having 20k+ IDs
CountDownLatch countDownLatch = new CountDownLatch(myIdSet.size());
myIdSet.forEach(id -> executorService.execute(() -> {
// retrieve & do processing the entity related to id
countDownLatch.countDown();
}));
countDownLatch.await(10L, TimeUnit.SECONDS);
executorService.shutdown();
// do some other stuff after all completed
// (but this point is getting executed before the all tasks get completed)
}
Lose the
CountDownLatchYou don't need the
CountDownLatch. An executor service provides that feature, built-in.shutdown&awaitTerminationinExecutorServiceThe key is making two calls:
ExecutorService#shutdownto stop the executor service from accepting more task submissions.ExecutorService#awaitTerminationto block and wait for all submitted tasks to finish.Here is the main chunk of code excerpted from a full example app posted further down.
First we generate some fake input. And we define a thread-safe list to hold
Resultobjects, with each submitted task instantiated and collecting a freshResultobject.Next we define a collection to hold our pending tasks. We populate with many
Taskobjects.We create an executor service with a reasonable number of threads, as appropriate to our deployment machine.
Next we define a collection to keep all the
Futureobjects returned by the executor service when we submit all our tasks.Submit all the tasks, capturing the futures.
Make those two crucial calls in this subroutine, to invoke a shutdown and wait for tasks to complete.
The key concept is that the code above blocks until all submitted tasks are done… or until the specified time-out fires.
Lastly, we can report on the work.
Complete code example
When run:
Parallel Streams
Using parallel streams may be a simpler route, depending on your specific needs. You have less control, such as not specifying the number of threads (by default), and less flexibility in the face of failures. But you get drastically shorter simpler code.
Define a record to hold each result. This is similar to above, but here we use a
Stringtype foridto be closer to your original Question.Create some bogus input data, as strings.
And do the work, all in a single line of code. The
.parallelStreamautomatically handles getting a pool of threads of a reasonable size, and then doing the work of instantiating ourResultobject as tasks assigned to the various threads.Lastly, report results.
Be aware that using parallel streams only makes sense where you have a lengthy workload. For a brief workload, using parallel streams rather than single stream may actually take more time, as there is overhead in setting up, managing, and collecting results across threads.
Project Loom
Everything above is relevant for today's Java. If Project Loom succeeds, and it looks like it will, the rules of the game will be changed… for the better.
To learn more, see posts, interviews, and presentations by members of the Loom team such as Ron Pressler and Alan Bateman. Focus on the more recent info, as Loom has evolved over time. And see the Java JEPs for virtual threads, and for structured concurrency. Previews are found in Java 19.