Operating infinite lists with strict monads

444 Views Asked by At

I have a function f :: [a] -> b that operates on infinite lists (e.g. take 5, takeWhile (< 100) . scanl (+) 0 and so on). I want to feed this function with values generated by strict monadic actions (e.g. randomIO).

From this question, I've learned that the repeat and sequence trick approach does not work for strict monads, as the example below show:

import Control.Monad.Identity

take 5 <$> sequence (repeat $ return 1) :: Identity [Int]
-- returns `Identity [1,1,1,1,1]`
-- works because Identity is non-strict

take 5 <$> sequence (repeat $ return 1) :: IO [Int]
 -- returns `*** Exception: stack overflow`
 -- does not work because IO is strict

So, instead, I thought about using the function "inside" the monadic context. I was inspired by this circular programming example and tried:

let loop = do
       x <- return 1
       (_, xs) <- loop
       return (take 5 xs, x:xs)
in  fst loop :: Identity [Int]
-- Overflows the stack

and

import Control.Monad.Fix

fst <$> mfix (\(_, xs) -> do
   x <- return 1
   return (take 5 xs, x:xs)) :: Identity [Int]
-- Overflows the stack

and even

{-# LANGUAGE RecursiveDo #-}
import System.Random

loop' = mdo
   (xs', xs) <- loop xs
   return xs'
where loop xs = do
      x <- randomIO
      return (take 5 xs, x:xs)

print $ loop'
-- Returns a list of 5 identical values

But none of these works. I also tried a Conduit approach which did not work either even in the Identity case:

import Conduit

runConduitPure $ yieldMany [1..] .| sinkList >>= return . take 5

Therefore I would like to know:

  1. Why none of the "circular" approaches above work?

  2. If there exists a solution to this that does not involve unsafeInterleaveIO. (maybe iteratees, Arrows?)

2

There are 2 best solutions below

3
On BEST ANSWER

I am using randomIO here just for simplicity of the examples. In practice, I would like to process messages received via sockets

That is not possible without unsafeInterleaveIO. The problem at the end of the day is that IO actions must be sequenced. While the order of evaluation for referentially transparent values doesn't matter, that of IO actions may. If you want a lazy infinite list of all the messages received over a socket, you have to inform Haskell a priori where in the sequence of IO actions this fits (unless you use unsafeInterleaveIO).

The Arrow abstraction you are seeking is called ArrowLoop, but it too has a problem with the right tightening law for strict monads.

At first sight, it may look like MonadFix (manifested via mdo or mfix) is a solution too, but digging a bit deeper shows that fixIO has problems, just like loop from ArrowLoop.

However, sometimes the restriction that IO actions must be run one after another is a bit excessive and that is what unsafeInterleaveIO is for. Quoting the docs

unsafeInterleaveIO allows IO computation to be deferred lazily. When passed a value of type IO a, the IO will only be performed when the value of the a is demanded.


Now, even if you explicitly said that you didn't want an unsafeInterleaveIO solution, as I have hopefully managed to convince you it is the way to go, here it is:

interweavingRepeatM :: IO a -> IO [a]
interweavingRepeatM action = unsafeInterleaveIO ((:) <$> action <*> interweavingRepeatM action)

And here is is working for random numbers:

ghci> import System.Random
ghci> sourceOfRandomness <- interweavingRepeatM randomIO :: IO [Integer]
ghci> take 10 sourceOfRandomness
[-2002742716261662204,7803971943047671004,-8395318556488893887,-7372674153585794391,5906750628663631621,6428130029392850107,6453903217221537923,-8966011929671667536,6419977320189968675,-1842456468700051776]
2
On

Alec's answer covers your general question. What follows is specifically about conduit and similar streaming libraries.

I also tried a Conduit approach which did not work either even in the Identity case:

import Conduit

runConduitPure $ yieldMany [1..] .| sinkList >>= return . take 5

While streaming libraries are commonly used to avoid the sort of difficulty you mention (cf. the opening remarks of Pipes.Tutorial), they assume you will use their stream types instead of lists. Consider, for instance, how sinkList is described by the Conduit documentation:

Consume all values from the stream and return as a list. Note that this will pull all values into memory.

That means using sinkMany immediately after yieldMany brings you back to square one: bringing all values into memory is precisely what makes the combination of sequence, IO and infinite lists unusable. What you should do instead is using the infrastructure of the streaming library to build the stages of your pipeline. Here are a few simple examples using mostly ready-made things from conduit and conduit-combinators:

GHCi> import Conduit
GHCi> runConduitPure $ yieldMany [1..] .| takeC 5 .| sinkList
[1,2,3,4,5]
GHCi> runConduit $ yieldMany [1..] .| takeC 5 .| printC -- try it without takeC
1
2
3
4
5
GHCi> runConduit $ yieldMany [1..] .| takeC 5 .| scanlC (+) 0 .| printC
0
1
3
6
10
15
GHCi> :{
GHCi| runConduit $ yieldMany [1..] .| takeC 5
GHCi|     .| awaitForever (\x -> liftIO (print (2*x)) >> yield x)
GHCi|     .| printC
GHCi| :}
2
1
4
2
6
3
8
4
10
5
GHCi> runConduit $ (sourceRandom :: Producer IO Int) .| takeC 5 .| printC 
1652736016140975126
5518223062916052424
-1236337270682979278
8079753510915129274
-609160753105692151

(Thanks Michael for making me notice sourceRandom -- at first I had rolled my own with repeatMC randomIO.)