I've been working on parallelising some data pipelines with AMPHP in a large codebase, and now I'd like to allow a timeout for if one of the pipelines fails.
The exact behaviour I'm after is having a per-task timeout of 120 seconds and being able to pick up the results of all of the successful tasks, whilst recording failures.
A rough sketch of the code is:
// Turn an async instance (data pipeline method) to a future using an AMPParallelTask wrapper.
$toFuture = fn(Asynchronous $instance) => submit(new AMPParallelTask($instance, $securableId, $securableType, $accountId),
new TimeoutCancellation(0.1)
)->getFuture();
[$errors, $responses] = awaitAll(
array_map($toFuture, $asynchronousInstances)
)
The issue is that it's possible for the Tasks' Task::run method to contain logic with try {} catch (Exception $e) {} blocks in them. When I run the above code, the cancellation of the task throws a CancellationException, which is caught by any try catch which doesn't specialise to a specific kind of exception. If there's a slow task in the try, this can completely break the logic and give incorrect results.
I'm wondering if there's a way to accomplish what I want without dropping all try {} catch (Exception $e) {} blocks in the run logic? My intuition is that maybe I can use channels...?