Lock Free Circular Array

2.8k Views Asked by At

I am thinking about implementing a lock free circular array. One problem is maintaining the head and tail pointers in a lock free manner. The code I have in mind is:

int circularIncrementAndGet(AtomicInteger i) {
    i.compareAndSet(array.length - 1, -1);
    return i.incrementAndGet();
}

Then I would do something like:

void add(double value) {
    int idx = circularIncrementAndGet(tail);
    array[idx] = value;
}

(Note that if the array is full old values will be overwritten, I am fine with that).

Does anyone sees a problem with this design? I suspect there might be a race condition I am not seeing.

5

There are 5 best solutions below

3
On BEST ANSWER

A simpler approach is to use a power of 2 size and do the following.

 final double[] array;
 final int sizeMask;
 final AtomicInteger i = new AtomicInteger();

 public CircularBuffer(int size) {
      assert size > 1 && ((size & (size -1)) == 0); // test power of 2.
      array = new double[size];
      sizeMask = size -1;
 }

 void add(double value) {
     array[i.getAndIncrement() & sizeMask] = value;
 }
2
On

Check out disruptor : http://lmax-exchange.github.io/disruptor/, it's an open-source lock-free circular buffer in Java.

0
On

Yes, there is a race condition.

Say i = array.length - 2, and two threads enter circularIncrementAndGet():

Thread 1: i.compareAndSet(array.length - 1, -1) results in i = array.length - 2
Thread 2: i.compareAndSet(array.length - 1, -1) results in i = array.length - 2
Thread 1: i.incrementAndGet() results in i = array.length - 1
Thread 2: i.incrementAndGet() results in i = array.length

leading to an ArrayIndexOutOfBoundsException when Thread 2 reaches array[idx] = value (and on all subsequent calls to add() until i overflows).

The solution proposed by @Peter Lawrey does not suffer from this problem.

0
On

If you stick with the following constraints:

  • Only one thread is allowed to modify the head pointer at any time
  • Only one thread is allowed to modify the tail pointer at any time
  • Dequeue-on-empty gives a return value indicating nothing was done
  • Enqueue-on-full gives a return value indicating nothing was done
  • You don't keep any count of how many values are stored in the queue.
  • You 'waste' one index in the array that will never be used, so that you can tell when the array is full or empty without having to keep count.

It is possible to implement a circular array/queue.

The enqueuing thread owns the tail pointer. The dequeueing thread owns the head pointer. Except for one condition, these two threads don't share any state so far, and so there are no problems.

That condition is testing for emptyness or fullness.

Consider empty to mean that head == tail; Consider full to mean tail == head - 1 modulo array size. Enqueue has to check to see if the queue is full, dequeue has to check to see if the queue is empty. You need to waste one index in the array to detect the difference between full and empty - if you enqueued into that last bucket, then full would be head == tail and empty would be head == tail and now you deadlock - you think you're empty and full at the same time, so no work would get done.

In performing these checks, its possible that one value could be updated while being compared. However since these two values are monotonically increasing, there is no correctness problem:

  • If, in the dequeue method, the head == tail computes to be true during the comparison, but tail moves forward just afterward, no problem - you thought the array was empty when it actually wasn't, but no big deal, you'll just return false from the dequeue method and try again.
  • If, in the enqueue method, the tail == head - 1 computes to be true, but just after so the head increments, then you'll think the array was full when it really wasn't, but again, no big deal, you'll just return false from enqueue and try again.

This is the design used behind the implementation I found in Dr. Dobb's years ago, and it has served me well:

http://www.drdobbs.com/parallel/lock-free-queues/208801974

0
On

https://github.com/jianhong-li/LockFreeRingBuffer this is my LockFreeRingBuffer with java language.

public class LockFreeRingBuffer<T> {

    public static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LockFreeRingBuffer.class);

    private final AtomicReferenceArray<T> buffer;
    private final int bufferSize;
    private final long bufferSizeMask;

    private final AtomicLong writeIndex = new AtomicLong(0);
    private final AtomicLong readIndex = new AtomicLong(0);

    public LockFreeRingBuffer(int bufferSize) {
        // Check if bufferSize is positive
        if (bufferSize <= 1) {
            throw new IllegalArgumentException("bufferSize must be positive");
        }

        // Check if bufferSize is power of 2
        int zCnt = 0;
        int _bufferSize = bufferSize;
        while (_bufferSize > 0) {
            if ((_bufferSize & 1) == 1) {
                zCnt++;
            }
            if (zCnt > 1) {
                throw new IllegalArgumentException("bufferSize must be power of 2");
            }
            _bufferSize = _bufferSize >> 1;
        }

        // Initialize buffer and bufferSize
        this.buffer = new AtomicReferenceArray<>(bufferSize);
        this.bufferSize = bufferSize;
        this.bufferSizeMask = bufferSize - 1;
    }

    public int push(T value) {

        // Ensure that the written data is valid
        if (value == null) {
            return -1;
        }

        long pWrite, pRead;
        int loopCnt = 0;
        for (; ; ) {

            int _rIndex = makeIndex(pRead = readIndex.get());
            int _wIndex = makeIndex(pWrite = writeIndex.get()); // push . _wIndex . Expect to read the latest version.

            if (nextIndex(pWrite) == _rIndex) {
                // buffer is full
                return -2;
            }

            // Make sure that the current write pointer points to a NULL slot. That is, it can be written to. (Make sure that the take side has cleaned up the data
)
            if (buffer.get(_wIndex) != null) {
                if ((++loopCnt) > 16) {
                    logger.trace("TRACE: push data retry [01] - buffer[{}] is not null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
                            _wIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
                    Thread.yield();
                }
                continue;
            }

            // Update the pointer first, then write the value. Make sure the ownership is written correctly
            if (writeIndex.compareAndSet(pWrite, pWrite + 1)) {

                // Write value: Theoretically this position must be empty to write
                if (buffer.compareAndSet(_wIndex, null, value)) {
                    // writeCnt.incrementAndGet();
                    return _wIndex;
                }
                // can not happen
                throw new RuntimeException("state error");
            }
        }
    }

    public T pop() {

        int loopCnt = 0;
        long pRead, pWrite;
        for (; ; ) {

            // P_w == P_r , buffer is empty
            int _rIndex = makeIndex(pRead = readIndex.get());
            int _wIndex = makeIndex(pWrite = writeIndex.get());

            if (_rIndex == _wIndex) {
                // buffer is empty
                return null;
            }

            T t = buffer.get(_rIndex); // There is no need to determine null here. However, it is a snapshot of pRead. So there might be a null situation.

            if (t == null) {
                if ((++loopCnt) > 16) {
                    logger.trace("TRACE: pop  data retry [20] - buffer[{}] is     null, pRead: {}, pWrite: {} readIndex:{} writeIndex:{} loopCnt:{}",
                            _rIndex, pRead, pWrite, readIndex.get(), writeIndex.get(), loopCnt);
                    Thread.yield();
                }
                continue;
            }

            /* ************************************************
             *              pWrite
             *              |
             *              v
             *  [] -> [] -> [] -> [] -> [] -> [] -> [] -> []
             *        ^
             *        |
             *        pRead
             *  ************************************************
             *  case: pRead = 1, pWrite = 1
             *        pWrite = pWrite + 1 = 2
             *        But we haven't had time to write the data yet. In this case, pRead = 1, pWrite = 2. The pRead location data is empty
.
             *        now, t==null will continue directly.
             *        after many loop,value at pRead effective finnaly. Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
             *        Then it is also normal to put the pRead value +1. To indicate that the ownership of pos_1 location data has been obtained.
             */
            if (readIndex.compareAndSet(pRead, pRead + 1)) {
                // pRead+1,
                // Data indicating that the original pointer position can be safely manipulated. And the above is guaranteed to read a non-null value. That is, the competitive write with push is complete.

                //
                // set to null
                boolean compareAndSet = buffer.compareAndSet(_rIndex, t, null);

                // must be success if code has no bug
                if (compareAndSet) {
                    // CAS success. t must be valid
                    return t;
                }
                logger.error("ERROR: pop_data_error - set to null failed, pRead: {} ({}) , pWrite: {} ({})readIndex:{} writeIndex:{}",
                        pRead, _rIndex, pWrite, _wIndex, readIndex.get(), writeIndex.get());
                // can not happen
                throw new RuntimeException("state error");
            }
        }
    }

    /**
     * this function maybe inline by JIT. 
     */
    private int nextIndex(long currentIndex) {
        return (int) ((currentIndex + 1) & bufferSizeMask);
    }

    /**
     * this function maybe inline by JIT. 
     */
    private int makeIndex(long currentIndex) {
        return (int) (currentIndex & bufferSizeMask);
    }

    // ======================== get / setter =======================
    public long getReadCnt() {
        return readIndex.get();
    }

    public long getWriteCnt() {
        return writeIndex.get();
    }
}