I'm writing a simple script to run bunch of tasks in parallel using the Shelly library but I want to limit the max number of tasks running at any one time. The script takes a file with an input on each line and runs a task for that input. There are a few hundred inputs in the file and I want to limit to around 16 processes at a time.

The current script actually limits to 1 (well tries to) using a QSem with an initial count of 1. I seem to be missing something though because when I run on a test file with 4 inputs I see this:


So the threads are not blocking on the QSem as I would expect, they're all running simultaneously. I've even gone so far as to implement my own semaphores both on MVar and TVar and neither worked the way I expected. I'm obviously missing something fundamental but what? I've also tried compiling the code and running it as a binary.

#!/usr/bin/env runhaskell
{-# LANGUAGE TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-}

import Shelly
import Prelude hiding (FilePath)
import Text.Shakespeare.Text (lt)
import qualified Data.Text.Lazy as LT
import Control.Monad (forM)
import System.Environment (getArgs)

import qualified Control.Concurrent.QSem as QSem
import Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar)

-- Define max number of simultaneous processes
maxProcesses :: IO QSem.QSem
maxProcesses = QSem.newQSem 1

bkGrnd :: ShIO a -> ShIO (MVar a)
bkGrnd proc = do
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
    -- Block until there are free processes
    sem <- maxProcesses
    QSem.waitQSem sem
    putStrLn "Starting"
    -- Run the shell command
    result <- shelly $ silently proc
    liftIO $ putMVar mvar result
    putStrLn "Done"
    -- Signal that this process is done and another can run.
    QSem.signalQSem sem
  return mvar

main :: IO ()
main = shelly $ silently $ do
    [img, file] <- liftIO $ getArgs
    contents <- readfile $ fromText $ LT.pack file
    -- Run a backgrounded process for each line of input.
    results <- forM (LT.lines contents) $ \line -> bkGrnd $ do
      runStdin <command> <arguments>
    liftIO $ mapM_ takeMVar results

As I said in my comment, each call to bkGrnd creates its own semaphonre, allowing every thread to continue without waiting. I would try something like this instead, where the semaphore is created in the main and passed each time to bkGrnd.

bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
    -- Block until there are free processes
    QSem.waitQSem sem
    -- code continues as before

main :: IO ()
main = shelly $ silently $ do
    [img, file] <- liftIO $ getArgs
    contents <- readfile $ fromText $ LT.pack file
    sem <- maxProcesses
    -- Run a backgrounded process for each line of input.
    results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
      runStdin <command> <arguments>
    liftIO $ mapM_ takeMVar results

Isn't it better

bkGrnd sem proc = do
  QSem.waitQSem sem
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do

so not even forkIO until you get the semaphore?


You have an answer, but I need to add: QSem and QSemN are not thread safe if killThread or asynchronous thread death is possible.

My bug report and patch are GHC trac ticket #3160. The fixed code is available as a new library called SafeSemaphore with module Control.Concurrent.MSem, MSemN, MSampleVar, and a bonus FairRWLock.