How to Pipe Typed Process to wai-conduit's responseSource?

118 Views Asked by At

I want to have warp run a process, then respond with that process' output. The output is assumed to be larger than the server's RAM; loading the entire output then responding is not an option. I'd thought that I could accomplish this using something like

withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)

but responseSource uses ConduitT i (Flush Builder) IO () and createSource uses ConduitT i ByteString m (). I could not figure how to convert a ByteString conduit to a Flush Builder conduit.


So I devised a solution that seems to work, but it's regrettably less simply defined:

responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->                                                                                                                                
     withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
         let loop = do
             bs <- hGetSome h defaultChunkSize
             unless (BS.null bs) (send (byteString bs) *> flush *> loop)
         in loop *> hClose h 

. Is this necessary, even if I may try prettying-it-up by wrapping in mkStreamSpec or something? Or is there a simpler method I'm missing?


edit: comments on the solution:

intersperseC lets me use Chunk and Flush together. That solves the Flush Builder/ByteString conversion problem. I haven't tested it, but it looks right and I trust it's been used.

However, I found that

withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
    responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)

closes the process handle too early. Thus I need to manage the pipe myself: using createPipe instead of createSource. But this means that I need to call hClose at the end, which means that I need a response handler that returns IO (); the only one that does (excepting responseRaw) is responseStream, which uses StreamingBody as an alternative to Conduit. Thus I conclude that my original solution is needed and that Conduit cannot be used for streaming processes. Feel free to correct this if it's incorrect.

1

There are 1 best solutions below

0
On BEST ANSWER

responseSource has type

responseSource :: Status -> ResponseHeaders -> Source IO (Flush Builder) -> Response

and the definition of Flush is

data Flush a = Chunk a | Flush

That is, a value of type Flush Builder is either a Builder or a command that instructs warp to flush the output stream.

Builder is from the binary package. It's basically a representation of a chunk of bytes, optimized for efficient concatenation. And it can be constructed from a ByteString, using the fromByteString function.

Knowing that, and using mapC from conduit, we can define this adapter:

adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) 

There's a problem though, the adapter never flushes. But we can intersperse flusing commands by means of intersperseC:

adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) .| intersperseC Flush

And what if we don't want to flush after every chunk? Perhaps we could use chunksOfCE to group the byte chunks before converting them into Flush values.