Java - Multiple queue producer consumer

1.5k Views Asked by At

I've got the following code:

    while(!currentBoard.boardIsValid()){
        for (QueueLocation location : QueueLocation.values()){
            while(!inbox.isEmpty(location)){
                Cell c = inbox.dequeue(location);
                notifyNeighbours(c.x, c.y, c.getCurrentState(),previousBoard);
            }
        }
    }

I've got a consumer with a few queues (all of their methods are synchronised). One queue for each producer. The consumer loops over all the queues and checks if they've got a task for him to consume. If the queue he's checking has a task in it, he consumes it. Otherwise, he goes to the check the next queue until he finishes iterating over all the queues.

As of now, if he iterates over all the queues and they're all empty, he keeps on looping rather than waiting for one of them to contain something (as seen by the outer while).

How can I make the consumer wait until one of the queues has something in it?

I'm having an issue with the following scenario: Lets say there are only 2 queues. The consumer checked the first one and it was empty. Just as he's checking the second one (which is also empty), the producer put something in the first queue. As far as the consumer is concerned, the queues are both empty and so he should wait (even though one of them isn't empty anymore and he should continue looping).

Edit: One last thing. This is an exercise for me. I'm trying to implement the synchronisation myself. So if any of the java libraries have a solution that implements this I'm not interested in it. I'm trying to understand how I can implement this.

3

There are 3 best solutions below

0
On BEST ANSWER

@Abe was close. I would use signal and wait - use the Object class built-ins as they are the lightest weight.

Object sync = new Object();  // Can use an existing object if there's an appropriate one

// On submit to queue
synchronized ( sync ) {
    queue.add(...);  // Must be inside to avoid a race condition
    sync.notifyAll();
}

// On check for work in queue
synchronized ( sync ) {
    item = null;
    while ( item == null ) {
        // Need to check all of the queues - if there will be a large number, this will be slow,
        // and slow critical sections (synchronized blocks) are very bad for performance
        item = getNextQueueItem();
        if ( item == null ) {
            sync.wait();
        }
    }
}

Note that sync.wait releases the lock on sync until the notify - and the lock on sync is required to successfully call the wait method (it's a reminder to the programmer that some type of critical section is really needed for this to work reliably).

By the way, I would recommend a queue dedicated to the consumer (or group of consumers) rather than a queue dedicated to the producer, if feasible. It will simplify the solution.

0
On

If you want to block across multiple queues, then one option is to use java's Lock and Condition objects and then use the signal method.

So whenever the producer has data, it should invoke the signallAll.

Lock fileLock = new ReentrantLock();
Condition condition = fileLock.newCondition();
...
// producer has to signal
condition.signalAll();
...
// consumer has to await.
condition.await();

This way only when the signal is provided will the consumer go and check the queues.

0
On

I solved a similar situation along the lines of what @Abe suggests, but settled on using a Semaphore in combination with an AtomicBoolean and called it a BinarySemaphore. It does require the producers to be modified so that they signal when there is something to do.
Below the code for the BinarySemaphore and a general idea of what the consumer work-loop should look like:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class MultipleProdOneConsumer {

BinarySemaphore workAvailable = new BinarySemaphore();

class Consumer {

    volatile boolean stop;

    void loop() {

        while (!stop) {
            doWork();
            if (!workAvailable.tryAcquire()) {
                // waiting for work
                try {
                    workAvailable.acquire();
                } catch (InterruptedException e) {
                    if (!stop) {
                        // log error
                    }
                }
            }
        }
    }

    void doWork() {}

    void stopWork() {
        stop = true;
        workAvailable.release();
    }
}

class Producer {

    /* Must be called after work is added to the queue/made available. */
    void signalSomethingToDo() {
        workAvailable.release();
    }
}

class BinarySemaphore {

    private final AtomicBoolean havePermit = new AtomicBoolean();
    private final Semaphore sync;

    public BinarySemaphore() {
        this(false);
    }

    public BinarySemaphore(boolean fair) {
        sync = new Semaphore(0, fair);
    }

    public boolean release() {

        boolean released = havePermit.compareAndSet(false, true);
        if (released) {
            sync.release();
        }
        return released;
    }

    public boolean tryAcquire() {

        boolean acquired = sync.tryAcquire();
        if (acquired) {
            havePermit.set(false);
        }
        return acquired;
    }

    public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException {

        boolean acquired = sync.tryAcquire(timeout, tunit);
        if (acquired) {
            havePermit.set(false);
        }
        return acquired;
    }

    public void acquire() throws InterruptedException {

        sync.acquire();
        havePermit.set(false);
    }

    public void acquireUninterruptibly() {

        sync.acquireUninterruptibly();
        havePermit.set(false);
    }

}

}