Time to embarrass myself again with a lack of understanding of how concurrency works in .NET :P
I'm trying to write a function that can encapsulates creating an async workflow that takes an asyncSeq input and distributes it to n parallel consumers.
let doWorkInParallel bufferSize numberOfConsumers consumer input = async {
let buffer = BlockingQueueAgent<_>(bufferSize)
let inputFinished = ref false
let produce seq = async {
do! seq |> AsyncSeq.iterAsync (Some >> buffer.AsyncAdd)
do! buffer.AsyncAdd None
}
let consumeParallel degree f = async {
let consume f = async {
while not <| !inputFinished do
try
let! data = buffer.AsyncGet(waitTimeout)
match data with
| Some i -> do! f i
// whichever consumer gets the end of the input flips the inputFinished ref so they'll all stop processing
| None -> inputFinished := true
with | :? TimeoutException -> ()
}
do! [for slot in 1 .. degree -> consume f ]
|> Async.Parallel
|> Async.Ignore
}
let startProducer = produce input
let startConsumers = consumeParallel numberOfConsumers consumer
return! [| startProducer; startConsumers |] |> Async.Parallel |> Async.Ignore
}
and I've been testing it with code like this to simulate a sequence that's fast to create but the work for each item takes a while.
let input = seq {1..50} |> AsyncSeq.ofSeq
let action i = async {
do! Async.Sleep 500
printfn "%d GET" i
}
let test = doWorkInParallel 10 2 action input
Async.RunSynchronously test
EDIT: I seem to have fixed my initial problems (ugh, I REALLY didn't think about what I was doing with an EventWaitHandle, so silly), but I'd still love to hear if this is a totally stupid approach to the problem with F#.