Creating a streaming Conduit Source with postgresql-simple

507 Views Asked by At

postgresql-simple provides functions for streaming queries, e.g.

fold 
  :: (FromRow row, ToRow params)
  => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a

I want to create a conduit source which takes full advantage of streaming.

mySource :: (FromRow row, Monad m) => Source m row

Unfortunately, because IO appears in a contravariant position (I think?) in fold, I'm really struggling with the types. The following type-checks, but folds the entire stream before yielding values.

getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
  where
    foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
    foo cond evt = pure (cond >> C.yield evt)

Any pointers on how to implement this would be greatly appreciated! Thanks!

2

There are 2 best solutions below

0
On BEST ANSWER

One (not so nice) way to go about this it to

  • make a new TMChan to receive rows
  • set foreach_ to just dump rows into this channel
  • finally use stm-conduit to make a source out of the channel

I don't have the means to test this off-hand, but the following should work

import Conduit
import Database.PostgreSQL.Simple (foreach_)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)

mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
mySource connection query = do
  chan <- newTMChanIO
  forEach_ connection query (atomically . writeTMChan chan)
  pure (sourceTMChan chan)

If only we had forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m () this might be easier...

0
On

Here's a modification of Alec's above that compiles and runs. mkPgSource is the general function that Alec referred to at the end of his post.

import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.FromRow
import Database.PostgreSQL.Simple.ToRow
import Control.Monad.IO.Class (MonadIO)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, 
closeTMChan, TMChan)
import GHC.Conc (atomically, forkIO)
import Conduit

--closes the channel after action is done to terminate the source
mkPgSource :: (MonadIO m, FromRow r) => ((r -> IO ()) -> IO ()) -> IO (Source m r)
mkPgSource action = do
  chan <- newTMChanIO
  _ <- forkIO $ do action $ atomically . (writeTMChan chan)
               atomically $ closeTMChan chan
  pure $ sourceTMChan chan

sourceQuery :: (ToRow params, FromRow r, MonadIO m) =>
     Connection -> Query -> params -> IO (Source m r)
sourceQuery conn q params = mkPgSource $ forEach conn q params

sourceQuery_ :: (FromRow r, MonadIO m) => Connection -> Query -> IO 
(Source m r)
sourceQuery_ conn q = mkPgSource $ forEach_ conn q