right to left - increment after chain in return

165 Views Asked by At

I'm trying to build a single producer/single consumer lock-free thread safe ring buffer.

Here's piece the code:

#include <iostream>

struct RingBuffer {
    static constexpr size_t S = 2;
    
    int data[S];
    size_t start = 0;
    size_t end = 0;

    size_t mask(size_t i) const {
        return i & (S - 1);
    }

    bool full() const {
        return end - start == S;
    }
    size_t size() const {
        return end - start;
    }
    
    void push(int t) {
        size_t i = mask(end);
        data[i] = t;
        end++;
    }
    int shift() {
        return data[mask(start++)];
    }
};
 
int main()
{
    RingBuffer ringBuffer;
    
    // emulate t1 that will write
    if (!ringBuffer.full()) {
        int dataToWrite = 10;
        ringBuffer.push(dataToWrite);
    }
    
    // emulate t2 that will read
    if (ringBuffer.size() > 0) {
        int dataRead = ringBuffer.shift();
        std::cout << dataRead << std::endl;
    }
}

Write on the buffer will be performed by a single t1 thread. Read from the buffer will be performed by a single t2 thread.

For what I'm learning, the only concurrency problem here could be in the shift method:

return data[mask(start++)];

because the order of operations must be:

  1. do mask() with the current start value
  2. return data[] at the index returned from point 1
  3. than, increment start

But the code actually will do 1-3-2, not 1-2-3. The question are:

  1. is it possible to to 1-2-3 with this kind of code?
  2. using an -O3 optimization (gcc), can the order of the operations be changed, making the whole undefined? (i.e. on push(), move end++ before data[i] = t?)
1

There are 1 best solutions below

3
On

Generally, using a thread-safe ring buffer, also requires to access it in a thread-safe manner. In main(), the check for full() is separate from push(), which makes it impossible for even a perfect ringBuffer to ensure thread-safety. Because of the one-producer and one-consumer restriction, there is no problem with this approach. However, the consistency of the ring buffer itself is at the mercy of a well-written producer and consumer. For example, if the producer calls push() five times in a row without checking full() before each, there will be overwritten values that are never read, even with a single producer and consumer.

Attacking the thread-safety problem

The thread-safety problem is twofold. First, the values that are stored in data in one thread, are not necessarily visible in another, thanks to the memory model of modern languages and CPUs. This is also true for the state of the buffer that is described by start and end. In addition, access to the state is unprotected, leaving room for the compiler and CPU to mess things up even further, leading to race-conditions!

Compilers and CPUs are allowed to reorder instructions and locally cache values from memory for performance reasons. That means that your program is not necessarily executed in the same order as your source code and not all "memory" values are loaded or stored directly from and to memory. Enter the need to reason from a multi-threaded perspective and you enter a very interesting area of expertise.

Fortunately, C++ offers language features to amend these problems. IMHO, studying what atomic has to offer, especially atomic_thread_fence, atomic_flag and atomic types, is worthwhile!

Lock-free implementation

Not using a lock requires a trade-off where push() and shift() "fail a little more often than necessary" to ensure that values are not overwritten or no garbage is read. In general this is complex and very hard to explain. Luckily, the one-producer and one-consumer restrictions together with start and end only increasing, the situation can be fixed more easily.

template<typename T, size_t S>
class SingleProducerSingleConsumerLockFreeQueue {
    static_assert(S > 0);

    std::atomic_size_t start = 0;
    std::atomic_size_t end = 0;
    T data[S];

    static size_t mask(size_t i) {
        return i % S; // compiler optimizes for S == 2^N
    }

public:
    size_t size() const noexcept { return end - start; }
    bool empty() const noexcept { return size() == 0; }
    bool full() const noexcept { return size() == S; }

    /**
     * Pushes \c value on the queue, which fails if the queue is full.
     * @param value The value to push.
     * @returns \c true if push was successful, \c false otherwise
     */
    bool push(const T &value) {
        // First load the end offset that we potentially change.
        size_t wr = end.load();
        // Now load the start offset that we will not change
        size_t rd = start.load();
        // Check if the buffer is full. If the other thread calls shift()
        // and increases start, the worst thing that happens is that we
        // return false here while we *could* have written a value, which
        // is better than overwriting an value tat is not yet start.
        if (wr - rd >= S) {
            return false;
        }
        data[mask(wr)] = value;
        // Now we update the end offset, where the fence makes sure
        // that information in the data array becomes visible for
        // the thread that uses shift after end.store().
        std::atomic_thread_fence(std::memory_order_release);
        end.store(wr + 1);
        return true;
    }

    /**
     * Shifts a value off the queue into \c value, which fails if the queue is empty.
     * @param value Contains the shifted value on success.
     * @returns \c true if shift was successful, \c false otherwise
     */
    bool shift(T &value) {
        // First load the offset that we potentially change.
        size_t rd = start.load();
        // Now load the offset that we will not change
        size_t wr = end.load();
        // Check if the buffer is empty. If the other threads calls
        // push() and increases end, the worst thing that happens is
        // that we return false here and that we *could* have written
        // a value, which is better than reading a value not yet written.
        if (wr <= rd) {
            return false;
        }
        // Ensure that information that was written by push() before
        // fetching end.load(), is visible here.
        std::atomic_thread_fence(std::memory_order_acquire);
        value = data[mask(rd)];
        // Update the start offset.
        start.store(rd + 1);
        return true;
    }
};

This can be applied in main() as follows:

while (ringBuffer.push(10));
int readValue;
while (ringBuffer.shift(readValue)) { 
    std::cout << readValue << std::endl;
}

Using a spin-lock?

Depending on your definition of "lock-free", it could be allowed to use a spinlock to guard the ring buffer state, using a busy loop to set a flag if and only if it was not set yet.

The advantage of using the spinlock is that the number of producer and consumer threads is unimportant: the implementation serves a more general use case. The drawback is busy waiting. While busy waiting is generally considered a bad pattern, the design and context of the consumer and producer can greatly affect whether that is a problem. Imagine a completely lock-free ring buffer and a consumer as follows:

while(true) {
  int read;
  while (!ringBuffer.shift(read)); // <!-- busy loop!
  do_something_with(read);
}

The busy-waiting was moved from the ring buffer itself to the consumer code, that still halts progress until something is available. In this case, using a ring buffer that does use a spin lock is probably okay.

The spinlock is used to ensure that only one thread can change the state of the ring buffer, consisting of {start, end}. It uses an atomic_flag for that. To ensure that the data written to elements of data is visible to other threads, an atomic_thread_fence is used. Here's the code, that omits the full() and size() methods.

// be sure to #include <atomic> 
class SingleProducerSingleConsumerSpinLockQueue {
  static constexpr size_t S = 2;

  size_t start = 0;
  size_t end = 0;
  std::atomic_flag flag = false; // flag that is used for a busy/spin lock
  int data[S];

  class SpinlockAndFence {
    std::atomic_flag &flag_;

  public:
    SpinlockAndFence(std::atomic_flag &flag) : flag_(flag) {
      // loop until we can set the flag, so other threads keep looping!
      while (flag_.test_and_set(std::memory_order_relaxed))
        ;
      // Ensure that all data that was written in the scope of a
      // SpinlockAndFence in another thread is visible to the scope of this
      // SpinlockAndFence.
      std::atomic_thread_fence(std::memory_order_acquire);
    }
    ~SpinlockAndFence() {
      // Clear the flag to allow other threads to set it and ensure that all
      // data that was written in the scope of this SpinlockAndFence is visible
      // to the scope of a SpinlockAndFence in another thread.
      std::atomic_thread_fence(std::memory_order_release);
      flag_.clear(std::memory_order_relaxed);
    }
  };

  static size_t wrap(size_t i) { // static, as it does not use instance values.
    return i % S;  // Compiler optimizes % for & if S is a power of two.
  }

public:
  size_t size() const noexcept { 
    SpinlockAndFence fence(flag);
    return end - start; 
  }
  bool empty() const noexcept { return size() == 0; }
  bool full() const noexcept { return size() == S; }

  /**
   * Pushes \c value on the queue, which fails if the queue is full.
   * @param value The value to push.
   * @returns \c true if push was successful, \c false otherwise
   */
  bool push(int value) {
    SpinlockAndFence fence(flag);
    if (end - start >= S) {
      return false;
    } else {
      data[wrap(end++)] = value;
    }
    return true;
  }
  /**
   * Shifts a value off the queue into \c value, which fails if the queue is empty.
   * @param value Contains the shifted value on success.
   * @returns \c true if shift was successful, \c false otherwise
   */
  bool shift(int &value) {
    SpinlockAndFence fence(flag);
    if (end - start == 0) {
      return false;
    } else {
      value = data[wrap(start++)];
    }
    return true;
  }
};

Splitting the check in full() and size() from the actual push() and shift() can work. But it still requires a lock in both full() and push(), as the lock also makes the variables start and end visible in both threads. Another solution is to expose the SpinlockAndFence, however, that puts a lot of responsibility at the call-site.