I want to process a series of jobs in sequence, but I want to queue up those jobs in parallel.
Here is my code:
open System.Threading.Tasks
let performWork (work : int) =
task {
do! Task.Delay 1000
if work = 7 then
failwith "Oh no"
else
printfn $"Work {work}"
}
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
do!
performWork work
|> Async.AwaitTask
ch.Reply()
})
w.Error.Add(fun exn -> raise exn)
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! w.PostAndAsyncReply(fun ch -> ch, i)
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
I expect this code to crash once it reaches work item 7.
However, it hangs forever:
$ dotnet fsi ./Test.fsx
Work 3
Work 1
Work 2
Work 4
Work 5
Work 6
I think that the w.Error event is not firing correctly.
How should I be capturing and re-throwing this error?
If my work is async, then it crashes as expected:
let performWork (work : int) =
async {
do! Async.Sleep 1000
if work = 7 then
failwith "Oh no"
else
printfn $"Work {work}"
}
But I don't see why this should matter.
Leveraging a Result also works, but again, I don't know why this should be required.
async {
let w = MailboxProcessor.Start (fun inbox -> async {
while true do
let! message = inbox.Receive()
let (ch : AsyncReplyChannel<_>), work = message
try
do!
performWork work
|> Async.AwaitTask
ch.Reply(Ok ())
with exn ->
ch.Reply(Error exn)
})
let performWorkOnWorker (work : int) =
async {
let! outcome = w.PostAndAsyncReply(fun ch -> ch, work)
match outcome with
| Ok () ->
return ()
| Error exn ->
return raise exn
}
let! completed =
seq {
for i = 1 to 10 do
async {
do! Async.Sleep 100
do! performWorkOnWorker i
return i
}
}
|> fun jobs -> Async.Parallel(jobs, maxDegreeOfParallelism = 4)
printfn $"Completed {Seq.length completed} job(s)."
}
|> Async.RunSynchronously
I think the gist of the 'why' here is that Microsoft changed the behaviour for 'unobserved' task exceptions back in .NET 4.5, and this was brought through into .NET Core: these exceptions no longer cause the process to terminate, they're effectively ignored. You can read more about it here.
I don't know the ins and outs of how
Taskandasyncare interoperating, but it would seem that the use ofTaskresults in the continuations being attached to that and run on theTaskScheduleras a consequence. The exception is thrown as part of theasynccomputation within theMailboxProcessor, and nothing is 'observing' it. This means the exception ends up in the mechanism referred to above, and that's why your process no longer crashes.You can change this behaviour via a flag on .NET Framework via
app.config, as explained in the link above. For .NET Core, you can't do this. You'd ordinarily try and replicate this by subscribing to theUnobservedTaskExceptionevent and re-throwing there, but that won't work in this case as theTaskis hung and won't ever be garbage collected.To try and prove the point, I've amended your example to include a timeout for
PostAndReplyAsync. This means that theTaskwill eventually complete, can be garbage collected and, when the finaliser runs, the event fired.The output I get here is:
As you can see, the exception is eventually published by the
Taskfinaliser, and re-throwing it in that handler brings down the app.While interesting, I'm not sure any of this is practically useful information. The suggestion to terminate the app within
MailboxProcessor.Errorhandler is probably the right one.