How to implementing a fork function that combines two consumer into one

223 Views Asked by At

I'm trying to build a streaming library using the abstractions described in the paper "Faster coroutine pipelines". I've modified the code so that it correctly handles pipeline exiting (instead of throwing out errors when that happens):

-- | r: return type of the continuation, i: input stream type, o: output stream type,
--   m: underlying monad, a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
  deriving
    ( Functor,
      Applicative,
      Monad
    )
    via (Cont (Result r m i o))

type Result r m i o = InCont r m i -> OutCont r m o -> m r

newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}

newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}

suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok

suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok

emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok Nothing emptyIk

await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)

yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)

(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
  runPipe
    q
    (\a _ ok' -> k a emptyIk ok')
    (suspendIn (runPipe p (\() -> f)) ik)
    ok
  where
    f :: Result r m i e
    f _ ok = resumeOut ok Nothing emptyIk

runContPipe :: forall m a. Applicative m => ContPipe a () Void m a -> m a
runContPipe p = runPipe p (\a _ _ -> pure a) ik ok
  where
    ik :: InCont a m ()
    ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
    ok :: OutCont a m Void
    ok = MakeOutCont \_ ik' -> resumeIn ik' ok

I would like to implement a function

fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)

That combines two consumer streams into one (similar to conduit's ZipSink). It should have the following semantics:

  1. If both streams haven't exited and are accepting inputs, feed the same input value to both streams
  2. If one stream has exited, store the return value then feed the input into the stream that's accepting the value
  3. If both streams have exited, exit with the return value of both streams put into a tuple.

Here's my attempt:

We reuse the loop function in the paper that connects an InCont r m i to two OutCont r m i and actively resumes the continuations.

loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
  resumeIn ik $ MakeOutCont \v ik' ->
    resumeOut ok1 v $ MakeInCont \ok1' ->
      resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'

With loop we can connect the input of the resulting pipe into the two pipes simultaneously, the output will be shared between the two pipes (it doesn't really matter since you cannot yield a Void).

fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
  MakePipe \k ik ok ->
    let f :: a -> Result r m i Void
        f a ik' ok' = _
        g :: b -> Result r m i Void
        g b ik' ok' = _
     in runPipe
          p
          f
          (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
          ok

Now we just need to fill in the continuations f and g which will be called by p and q when they exit. If g has already been called when f was called, which means q has exited, then f should call the continuation k, if g hasn't been called yet, then f should store the return value a and resume the input continuation (by discarding all of the values passed) It seems to me that it's not possible to achieve this without some form of shared state. And we could try to store the state in m using a state monad:

fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
  MakePipe \k ik ok ->
    let f :: a -> Result r m i Void
        f a ik' ok' = do
          s <- get
          case s of
            Nothing -> do
              put (Just (Left a))
              resumeIn ik' sinkOk
            Just (Right b) -> do
              k (a, b) ik' ok'
            _ -> error "unexpected state"
        g :: b -> Result r m i Void
        g b ik' ok' = do
          s <- get
          case s of
            Nothing -> do
              put (Just (Right b))
              resumeIn ik' sinkOk
            Just (Left a) -> do
              k (a, b) ik' ok'
            _ -> error "unexpected state"
     in runPipe
          p
          f
          (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
          ok

sinkOk is an output continuation that discards all of its inputs:

sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk

we could now add some auxiliary functions for testing:

print' :: MonadIO m => Show i => ContPipe r i o m ()
print' = do
  m <- await
  case m of
    Nothing -> pure ()
    Just i -> do
      lift $ liftIO (print i)
      print'

upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
  yield i
  upfrom (i + 1)

take' :: Int -> ContPipe r i i m ()
take' n
  | n <= 0 = pure ()
  | otherwise = do
    m <- await
    case m of
      Nothing -> pure ()
      Just i -> do
        yield i
        take' (n - 1)

This does work in the case where p exits earlier than q:

flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' print'

gives the desired output:

1
1
2
2
3
3
((),())

But it goes into infinite loops when q exits earlier than p:

flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')

outputs:

1
1
2
2
<loops>
0

There are 0 best solutions below