I'm rather new to parallel code, and I tried to convert some code based on executors to structured concurrency, but I lost an important property that I must somehow keep.
Given the following code using structured concurrency with Java 21 preview:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<Data1> d1Subtask = scope.fork(() -> getData1(input));
Subtask<Data2> d2Subtask = scope.fork(() -> getData2(input));
scope.join().throwIfFailed(); // [1]
var data1 = d1Subtask.get(); // [2]
var data2 = d2Subtask.get();
return new Response(data1, data2);
}
In [1] an eventual first exception out of the two subtasks is thrown, and I don't want that. I need to run both tasks in parallel but I need the result of d1Subtask first in case it fails. In other words:
- if
d1Subtaskfails, I need to throw its exception (d2Subtaskmight be running, be successful or failed and none of it matters, exceptions fromd1Subtaskmake the second task irrelevant); - if
d1Subtasksucceeds andd2Subtaskfail, I need the exception fromd2Subtask; - if both succeed, combine the results of both.
If I change it to scope.join(); then [2] can fail if d1Subtask is not done. There is d1Subtask.state() but waiting for it to leave the State.UNAVAILABLE state seems against the idea of structured concurrency.
This can be achieved with Executors and pure StructuredTaskScope, but that means potentially running d2Subtask to completion even when the scope could be shut down and that task aborted.
Given that, is possible to modify the code above to wait for the result of d1Subtask in a clean, readable way? I imagined that something like scope.join(d1Subtask) or d1Subtask.join() would be the way of doing it, or maybe a different policy, if that API existed.
Edit: clearer explanation of the desired logic with each possible outcome.
You can use
StructuredTaskScopedirectly, withoutShutdownOnFailure, to wait for all jobs to complete, then, you can check the results and failures in the intended order, e.g.This is the simplest approach. It ensures that if both jobs failed, the exception of “data1” is propagated to the caller. The only disadvantage is that if “data1” failed before “data2”’s completion, it will wait for “data2”, without an attempt to interrupt it. This, however, may be acceptable as we’re usually not trying (too hard) to optimize the exceptional case.
But you can also implement your own policy. Here’s an example of a policy having a “primary job”. When other jobs failed, it will wait for the primary job’s completion, to prefer its exception if it failed too. But when the primary job failed, it will shut down immediately, trying to interrupt all other jobs and not wait for their completion:
For completeness, I provide code for testing all scenarios at the end of this answer. It checks for all combinations of success and failures.
With the implemented approaches, it will print
Abbrev. status: Finished, Interrupted, or Running
The issue was the scenario of D1 failing slow and D2 failing fast, in the middle of the 3rd line. The
ShutdownOnFailurethen aborted D1 (D1 status Interrupted) and propagated D2’s failure. The simple approach clearly fixes it but loses the ability to fail fast when D1 failed fast (the last scenario in the 2nd line, D2 status now Finished). The custom policy solves the original issue while retaining the fail-fast support.