Why is my MailboxProcessor hanging?

375 Views Asked by At

I can't work out why the following code is hanging at the call to GetTotal. I don't seem to be able to debug inside the MailboxProcessor, so it's hard to see what's going on.

module Aggregator

open System

type Message<'T, 'TState> =
    | Aggregate of 'T
    | GetTotal of AsyncReplyChannel<'TState>

type Aggregator<'T, 'TState>(initialState, f) =
    let myAgent = new MailboxProcessor<Message<'T, 'TState>>(fun inbox ->
        let rec loop agg =
            async {
                let! message = inbox.Receive()
                match message with
                    | Aggregate x -> return! loop (f agg x)
                    | GetTotal replyChannel ->
                        replyChannel.Reply(agg)
                        return! loop agg
            }
        loop initialState
        )

    member m.Aggregate x = myAgent.Post(Aggregate(x))
    member m.GetTotal = myAgent.PostAndReply(fun replyChannel -> GetTotal(replyChannel))

let myAggregator = new Aggregator<int, int>(0, (+))

myAggregator.Aggregate(3)
myAggregator.Aggregate(4)
myAggregator.Aggregate(5)

let totalSoFar = myAggregator.GetTotal
printfn "%d" totalSoFar

Console.ReadLine() |> ignore

It seems to work fine when using an identical MailboxProcessor directly, rather than wrapping in the Aggregator class.

2

There are 2 best solutions below

0
On BEST ANSWER

The problem is that you did not start the agent. You can either call Start after you create the agent:

let myAgent = (...)
do myAgent.Start()

Alternatively, you can create the agent using MailboxProcessor<'T>.Start instead of calling the constructor (I usually prefer this option, because it looks more functional):

let myAgent = MailboxProcessor<Message<'T, 'TState>>.Start(fun inbox ->  (...) )

I suppose that you couldn't debug the agent, because the code inside agent wasn't actually running. I tried adding printfn "Msg: %A" message right after the call to Receive inside the agent (to print incoming messages for debugging) and I noticed that, after calling Aggregate, no messages were actually received by the agent... (It only blocked after calling GetTotal, which avaits reply)

As a side-note, I would probably turn GetTotal into a method, so you'd call GetTotal(). Properties are re-evaluated each time you access them, so your code does the same thing, but best practices don't recommend using properties that do complex work.

1
On

You forgot to start the mailbox:

open System

type Message<'T, 'TState> =
    | Aggregate of 'T
    | GetTotal of AsyncReplyChannel<'TState>

type Aggregator<'T, 'TState>(initialState, f) =
    let myAgent = new MailboxProcessor<Message<'T, 'TState>>(fun inbox ->
        let rec loop agg =
            async {
                let! message = inbox.Receive()
                match message with
                    | Aggregate x -> return! loop (f agg x)
                    | GetTotal replyChannel ->
                        replyChannel.Reply(agg)
                        return! loop agg
            }
        loop initialState
        )

    member m.Aggregate x = myAgent.Post(Aggregate(x))
    member m.GetTotal = myAgent.PostAndReply(fun replyChannel -> GetTotal(replyChannel))
    member m.Start() = myAgent.Start()

let myAggregator = new Aggregator<int, int>(0, (+))

myAggregator.Start()

myAggregator.Aggregate(3)
myAggregator.Aggregate(4)
myAggregator.Aggregate(5)

let totalSoFar = myAggregator.GetTotal
printfn "%d" totalSoFar

Console.ReadLine() |> ignore