Streaming string data with F# Suave

232 Views Asked by At

With Suave 2.4.0 supporting TransferEncoding.chunked and HttpOutput.writeChunk I have written the below code to stream out data over HTTP.

let sendStrings getStringsFromProducer : WebPart =
    Writers.setStatus HTTP_200 >=>
    TransferEncoding.chunked (fun conn -> socket {
        let refConn = ref conn

        for str in getStringsFromProducer do
            let! (_, conn) = (str |> stringToBytes |> HttpOutput.writeChunk) !refConn
            refConn := conn

        return! HttpOutput.writeChunk [||] !refConn
    }
)

While this works, I question the reliability of using ref and hoping there are better way out there to do the same in a more functional manner. Are there better way to do this? Assuming I cannot change getStringsFromProducer?

2

There are 2 best solutions below

0
On BEST ANSWER

I think you cannot avoid all mutation in this case - writing chunks one by one is a fairly imperative operation and iterating over a lazy sequence also requires (mutable) iterator, so there is no way to avoid all mutation. I think your sendStrings function does a nice job at hiding the mutation from the consumer and provides a nice functional API.

You can avoid using ref cells and replace them with local mutable variable, which is a bit safer - because the mutable variable cannot escape the local scope:

TransferEncoding.chunked (fun conn -> socket {
    let mutable conn = conn
    for str in getStringsFromProducer do
        let! _, newConn = HttpOutput.writeChunk (stringToBytes str) conn
        conn <- newConn
    return! HttpOutput.writeChunk [||] conn
}

You could avoid the mutable conn variable by using recursion, but this requires you to work with IEnumerator<'T> rather than using a nice for loop to iterate over the sequence, so I think this is actually less nice than the version using a mutable variable:

TransferEncoding.chunked (fun conn -> socket {
    let en = getStringsFromProducer.GetEnumerator()
    let rec loop conn = socket {
      if en.MoveNext() then 
        let! _, conn = HttpOutput.writeChunk (stringToBytes en.Current) conn
        return! loop conn }
    do! loop conn
    return! HttpOutput.writeChunk [||] conn }) 
0
On

I was looking for a way to replace refs/mutables in F# in a general way, and while I came up with a solution, it might be overkill in your case. It looks like the ref is a local that is only updated from within a single thread, so it's probably fairly safe. However, if you want to replace it, here's how I solved the problem:

type private StateMessage<'a> =
| Get of AsyncReplyChannel<'a>
| GetOrSet of 'a * AsyncReplyChannel<'a>
| GetOrSetResult of (unit -> 'a) * AsyncReplyChannel<'a>
| Set of 'a
| Update of ('a -> 'a) * AsyncReplyChannel<'a>

type Stateful<'a>(?initialValue: 'a) =
    let agent = MailboxProcessor<StateMessage<'a>>.Start
                <| fun inbox ->
                    let rec loop state =
                        async {
                            let! message = inbox.Receive()
                            match message with
                            | Get channel -> 
                                match state with
                                | Some value -> channel.Reply(value)
                                | None -> channel.Reply(Unchecked.defaultof<'a>)
                                return! loop state
                            | GetOrSet (newValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | GetOrSetResult (getValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    let newValue = getValue ()
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | Set value -> 
                                return! loop (Some value)
                            | Update (update, channel) ->
                                let currentValue =
                                    match state with
                                    | Some value -> value
                                    | None -> Unchecked.defaultof<'a>
                                let newValue = update currentValue
                                channel.Reply(newValue)
                                return! loop (Some newValue)
                        }
                    loop initialValue

    let get () = agent.PostAndReply Get
    let asyncGet () = agent.PostAndAsyncReply Get
    let getOrSet value = agent.PostAndReply <| fun reply -> GetOrSet (value, reply)
    let asyncGetOrSet value = agent.PostAndAsyncReply <| fun reply -> GetOrSet (value, reply)
    let getOrSetResult getValue = agent.PostAndReply <| fun reply -> GetOrSetResult (getValue, reply)
    let asyncGetOrSetResult getValue = agent.PostAndAsyncReply <| fun reply -> GetOrSetResult (getValue, reply)
    let set value = agent.Post <| Set value
    let update f = agent.PostAndReply <| fun reply -> Update (f, reply)
    let asyncUpdate f = agent.PostAndAsyncReply <| fun reply -> Update (f, reply)

    member __.Get () = get ()
    member __.AsyncGet () = asyncGet ()
    member __.GetOrSet value = getOrSet value
    member __.AsyncGetOrSet value = asyncGetOrSet value
    member __.GetOrSetResult getValue = getOrSetResult getValue
    member __.AsyncGetOrSetResult getValue = asyncGetOrSetResult getValue
    member __.Set value = set value
    member __.Update f = update f
    member __.AsyncUpdate f = asyncUpdate f

This basically uses a MailboxProcessor to serialize updates to state that's managed by a tail-recursive function, similar to Tomas' second example. However, this allows you to call Get/Set/Update in a way that's more like traditional mutable state, even though it's not actually doing mutation. You can use it like this:

let state = Stateful(0)
state.Get() |> printfn "%d"
state.Set(1)
state.Get() |> printfn "%d"
state.Update(fun x -> x + 1) |> printfn "%d"

This will print:

0
1 
2