I'm working on a staged job system, which currently works as follows (pseudocode):
cyclicbarrier.init(numthreads)
on each thread:
for each stage s:
loop forever:
pop job from joblist[s]
if no job:
break
execute job
wait at cyclicbarrier
This causes all jobs in the stage to finish executing before moving onto the next stage.
cyclicbarrier
is implemented using two semaphores:
sem1(0)
sem2(0)
n = 0
function wait:
if atomic_incr(n) == maxthreads:
sem1.signal(maxthreads)
sem1.wait()
if atomic_decr(n) == 0:
sem2.signal(maxthreads)
sem2.wait()
I would like to add support for a job to be able to add 1 or more jobs into the currently running stage, and have those jobs also execute before the stage continues. The above code will do this, however if this job is added after a thread has reached the barrier, then it will be executed sub-optimally, as a waiting thread could be executing it but isn't.
I've boiled down the problem to needing a kind of hybrid semaphore
/cyclicbarrier
, which for convenience I'll call a sembarrier
. Here's how it would function:
function wait:
n--
if n < 0 and n > -maxthreads:
suspend until signaled
else if n <= -maxthreads:
signal(maxthreads)
Essentially, this sembarrier
works just like a semaphore
, but when enough threads are waiting on it, it behaves like a cyclicbarrier
and releases them all.
So, here's the new implementation using the sembarrier
.
sembarrier
constructor looks like:
sembarrier(initial_value, num_threads)
Create a sembarrier
for each stage:
n = num_theads
for each stage s:
sembarrier[s].init(0, n)
This would be the modified loop, using the sembarrier
array:
on each thread:
for each stage s:
loop forever:
sembarrier[s].wait()
pop job from joblist[s]
if no job:
break;
execute job
Additionally, when adding to joblist
:
function AddJob(stage s, job j):
joblist[s].push(j)
sembarrier[s].signal()
Questions:
1) How can I efficiently implement sembarrier
? I have access to basic concurrency structures, such as mutexes and semaphores. I also have the standard atomic ops.
2) Is there an alternative solution to my original problem?
Thanks!