Parallel consumption of asyncSeq using BlockingQueueAgent

231 Views Asked by At

Time to embarrass myself again with a lack of understanding of how concurrency works in .NET :P

I'm trying to write a function that can encapsulates creating an async workflow that takes an asyncSeq input and distributes it to n parallel consumers.

let doWorkInParallel bufferSize numberOfConsumers consumer input = async {
    let buffer = BlockingQueueAgent<_>(bufferSize)
    let inputFinished = ref false

    let produce seq = async {
        do! seq |> AsyncSeq.iterAsync (Some >> buffer.AsyncAdd)
        do! buffer.AsyncAdd None
    }

    let consumeParallel degree f = async {
        let consume f = async {
            while not <| !inputFinished do
                try
                    let! data = buffer.AsyncGet(waitTimeout)
                    match data with
                    | Some i -> do! f i
                    // whichever consumer gets the end of the input flips the inputFinished ref so they'll all stop processing
                    | None -> inputFinished := true
                with | :? TimeoutException -> ()
        }

        do! [for slot in 1 .. degree -> consume f ]
            |> Async.Parallel
            |> Async.Ignore
    }

    let startProducer = produce input
    let startConsumers = consumeParallel numberOfConsumers consumer

    return! [| startProducer; startConsumers |] |> Async.Parallel |> Async.Ignore
}

and I've been testing it with code like this to simulate a sequence that's fast to create but the work for each item takes a while.

let input = seq {1..50} |> AsyncSeq.ofSeq
let action i = async {
    do! Async.Sleep 500
    printfn "%d GET" i
    }
let test = doWorkInParallel 10 2 action input
Async.RunSynchronously test

EDIT: I seem to have fixed my initial problems (ugh, I REALLY didn't think about what I was doing with an EventWaitHandle, so silly), but I'd still love to hear if this is a totally stupid approach to the problem with F#.

0

There are 0 best solutions below