The withTimeout function is suppose to pipe ConsoleEvent with a CeTimeout sent every s :: Int seconds if nothing has been received. Instead it fails to send the CeTimeout events at the appropriate times. One CeTimeout event is replaced for other events if greater than s seconds have passed with the original event being lost. Also instead of one CeTimeout event, it should be n*s CeTimeout events with n counting for each s second period that has passed. Where is the mistake, and what would be the correction? Thanks!
withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
where
work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ())
work =
do
(oSent, iKept) <- spawn $ bounded 1
(oKept, iSent) <- spawn $ unbounded
(oTimeout, iTimeout) <- spawn $ bounded 1
tid <- launchTimeout oTimeout >>= newMVar
forkIO $ do
runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept
forkIO $ do
runEffect . forever $ fromInput iTimeout >-> toOutput oKept
return $ do
await >>= (liftIO . guardedSend oSent)
(liftIO . guardedRecv $ iSent) >>= yield
guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
guardedSend o ce =
(atomically $ send o ce) >>= \case
True -> return ()
otherwise -> die $ "withTimeout can not send"
guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
guardedRecv i =
(atomically $ recv i) >>= \case
Just a -> return a
otherwise -> die $ "withTimeout can not recv"
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
Here is a fully executable script.
It seems like a
Pipewill only allow oneyieldperawait. This means that aCeTimeoutcan not arbitrarily be sent down the pipe because nothing came into the pipe to cause the flow. I will have to go through the source to confirm this; in the meantime this function has been refactored to return aPipeand aProducerinstead of just aPipe. TheProducercan then be joined back in the calling function. The initial plan was to return just aPipeso that the calling function would not have to do any additional work to make timeouts work. That would have been a more self contained solution. This alternative is nice in that it is more explicit. The timeouts won't look like they are appearing out of thin air to someone that is not familiar with the pipeline.Here is a fully executable script.