The withTimeout
function is suppose to pipe ConsoleEvent
with a CeTimeout
sent every s :: Int
seconds if nothing has been received. Instead it fails to send the CeTimeout
events at the appropriate times. One CeTimeout
event is replaced for other events if greater than s
seconds have passed with the original event being lost. Also instead of one CeTimeout
event, it should be n*s
CeTimeout
events with n
counting for each s
second period that has passed. Where is the mistake, and what would be the correction? Thanks!
withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
where
work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ())
work =
do
(oSent, iKept) <- spawn $ bounded 1
(oKept, iSent) <- spawn $ unbounded
(oTimeout, iTimeout) <- spawn $ bounded 1
tid <- launchTimeout oTimeout >>= newMVar
forkIO $ do
runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept
forkIO $ do
runEffect . forever $ fromInput iTimeout >-> toOutput oKept
return $ do
await >>= (liftIO . guardedSend oSent)
(liftIO . guardedRecv $ iSent) >>= yield
guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
guardedSend o ce =
(atomically $ send o ce) >>= \case
True -> return ()
otherwise -> die $ "withTimeout can not send"
guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
guardedRecv i =
(atomically $ recv i) >>= \case
Just a -> return a
otherwise -> die $ "withTimeout can not recv"
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
Here is a fully executable script.
It seems like a
Pipe
will only allow oneyield
perawait
. This means that aCeTimeout
can not arbitrarily be sent down the pipe because nothing came into the pipe to cause the flow. I will have to go through the source to confirm this; in the meantime this function has been refactored to return aPipe
and aProducer
instead of just aPipe
. TheProducer
can then be joined back in the calling function. The initial plan was to return just aPipe
so that the calling function would not have to do any additional work to make timeouts work. That would have been a more self contained solution. This alternative is nice in that it is more explicit. The timeouts won't look like they are appearing out of thin air to someone that is not familiar with the pipeline.Here is a fully executable script.