I want to have warp run a process, then respond with that process' output. The output is assumed to be larger than the server's RAM; loading the entire output then responding is not an option. I'd thought that I could accomplish this using something like
withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)
but responseSource
uses ConduitT i (Flush Builder) IO ()
and createSource
uses ConduitT i ByteString m ()
. I could not figure how to convert a ByteString
conduit to a Flush Builder
conduit.
So I devised a solution that seems to work, but it's regrettably less simply defined:
responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->
withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
let loop = do
bs <- hGetSome h defaultChunkSize
unless (BS.null bs) (send (byteString bs) *> flush *> loop)
in loop *> hClose h
. Is this necessary, even if I may try prettying-it-up by wrapping in mkStreamSpec
or something? Or is there a simpler method I'm missing?
edit: comments on the solution:
intersperseC
lets me use Chunk
and Flush
together. That solves the Flush Builder
/ByteString
conversion problem. I haven't tested it, but it looks right and I trust it's been used.
However, I found that
withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)
closes the process handle too early. Thus I need to manage the pipe myself: using createPipe
instead of createSource
. But this means that I need to call hClose
at the end, which means that I need a response handler that returns IO ()
; the only one that does (excepting responseRaw
) is responseStream
, which uses StreamingBody
as an alternative to Conduit. Thus I conclude that my original solution is needed and that Conduit cannot be used for streaming processes. Feel free to correct this if it's incorrect.
responseSource
has typeand the definition of
Flush
isThat is, a value of type
Flush Builder
is either aBuilder
or a command that instructs warp to flush the output stream.Builder
is from the binary package. It's basically a representation of a chunk of bytes, optimized for efficient concatenation. And it can be constructed from aByteString
, using thefromByteString
function.Knowing that, and using
mapC
from conduit, we can define this adapter:There's a problem though, the adapter never flushes. But we can intersperse flusing commands by means of
intersperseC
:And what if we don't want to flush after every chunk? Perhaps we could use
chunksOfCE
to group the byte chunks before converting them intoFlush
values.