I have the following code:
ConcurrentHashMap taskMap= new ConcurrentHashMap();
....
taskMap.compute(key, (k, queue) -> {
CompletableFuture<Void> future = (queue == null)
? CompletableFuture.runAsync(myTask, poolExecutor)
: queue.whenCompleteAsync((r, e) -> myTask.run(), poolExecutor);
//to prevent OutOfMemoryError in case if we will have too much keys
future.whenComplete((r, e) -> taskMap.remove(key, future));
return future;
});
The issue of this code that in case of future
already completed whenComplete
function argument invokes in the same thread as compute
invokes. In the body of this method we remove entry from map. But compute method documentation forbid this and application freezes.
How can I fix this issue?
The most obvious solution is to use
whenCompleteAsync
instead ofwhenComplete
, as the former guarantees to execute the action using the suppliedExecutor
rather than the calling thread. Which can be demonstrated withwhich will print something like
So you could just use
If early completion has a significant likelihood, you could reduce the overhead using
Maybe, you didn’t come to this obvious solution, because you don’t like that the dependent action will always be scheduled as a new task to the pool, even if the completion happens in a different task already. You could solve this with a specialized executor which will only reschedule the task when necessary:
But you may rethink whether this complicated per-mapping cleanup logic is really desired. It’s not only complicated but also may create a notable overhead, potentially scheduling lots of cleanup actions that are not really required when they are already outdated when executed.
It might be much simpler and even more efficient to execute
from time to time to cleanup the entire map.