Hybrid semaphore/cyclic barrier needed

317 Views Asked by At

I'm working on a staged job system, which currently works as follows (pseudocode):

cyclicbarrier.init(numthreads)
on each thread:
  for each stage s:
    loop forever:
      pop job from joblist[s]
      if no job:
        break
      execute job
    wait at cyclicbarrier

This causes all jobs in the stage to finish executing before moving onto the next stage.

cyclicbarrier is implemented using two semaphores:

sem1(0)
sem2(0)
n = 0

function wait:
  if atomic_incr(n) == maxthreads:
    sem1.signal(maxthreads)
  sem1.wait()

  if atomic_decr(n) == 0:
    sem2.signal(maxthreads)
  sem2.wait()

I would like to add support for a job to be able to add 1 or more jobs into the currently running stage, and have those jobs also execute before the stage continues. The above code will do this, however if this job is added after a thread has reached the barrier, then it will be executed sub-optimally, as a waiting thread could be executing it but isn't.

I've boiled down the problem to needing a kind of hybrid semaphore/cyclicbarrier, which for convenience I'll call a sembarrier. Here's how it would function:

function wait:
  n--
  if n < 0 and n > -maxthreads:
    suspend until signaled
  else if n <= -maxthreads:
    signal(maxthreads)

Essentially, this sembarrier works just like a semaphore, but when enough threads are waiting on it, it behaves like a cyclicbarrier and releases them all.

So, here's the new implementation using the sembarrier.

sembarrier constructor looks like:

sembarrier(initial_value, num_threads)

Create a sembarrier for each stage:

n = num_theads
for each stage s:
  sembarrier[s].init(0, n)

This would be the modified loop, using the sembarrier array:

on each thread:
  for each stage s:
    loop forever:
      sembarrier[s].wait()
      pop job from joblist[s]
      if no job:
        break;
      execute job

Additionally, when adding to joblist:

function AddJob(stage s, job j):
  joblist[s].push(j)
  sembarrier[s].signal()

Questions:

1) How can I efficiently implement sembarrier? I have access to basic concurrency structures, such as mutexes and semaphores. I also have the standard atomic ops.

2) Is there an alternative solution to my original problem?

Thanks!

0

There are 0 best solutions below