Why does my parallel traversal Haskell program leak memory?

250 Views Asked by At

Consider the following Haskell program (I'm doing this mostly for learning purposes):

import qualified Control.Concurrent.MSem as Sem
import System.Environment (getArgs)
import Control.Concurrent (forkIO)
import Control.Monad

-- Traverse with maximum n threads
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> Sem.with sem (forkIO $ action value)

main :: IO ()
main = do
  args <- getArgs
  let nThreads = read . head $ args :: Int
  parallelTraverse nThreads print [(1::Int)..]

when I run it, memory quickly climbs to several GB. I tried various combinations to make sure I discard the results of intermediate computations (the print actions). Why is it still leaking space?

1

There are 1 best solutions below

2
On BEST ANSWER

First of all, you have an evident mistake in the following piece:

Sem.with sem (forkIO $ action value)

You're addressing the semaphore from the master thread around the "fork" operation instead of the action there. Following is the proper way to implement it:

forkIO (Sem.with sem (action value))

I.e., to address the semaphore from the context of the forked thread.

Secondly, in the following code you're calling the parallelTraverse operation on an infinite list:

parallelTraverse nThreads print [(1::Int)..]

Which results in the infinite forking of threads. And since the forkIO operation is roughly instantaneous for the calling thread, it's pretty much no surprise that you're running out of resources quite soon.


To use the semaphore to limit the number of worker threads the with pattern simply won't do in your case. Instead you should use the explicit combination of wait and signal and not forget to treat the exceptions properly (in case you expect them). E.g.,:

parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> do
    Sem.wait sem
    forkIO $ finally (action value) (Sem.signal sem)