disruptor: claiming slots in the face of integer overflow

161 Views Asked by At

I am implementing the disruptor pattern for inter thread communication in C++ for multiple producers. In the implementation from LMAX the next(n) method in MultiProducerSequencer.java uses signed integers (okay it's Java) but also the C++ port (disruptor--) uses signed integers. After a very (very) long time overflow will result in undefined behavior.

Unsigned integers have multiple advantages:

  • correct behavior on overflow
  • no need for 64 bit integers

Here is my approach for claiming n slots (source is attached at the end): next_ is the index of next free slot that can be claimed, tail_ is the last free slot that can be claimed (will be updated somewhere else). n is smaller than the buffer size. My approach is to normalize next and tail position for intermediate calculations by subtracting tail from next. Adding n to normalized next norm must be smaller than the buffer size to successfully claim the slots between next_ and next_+n. It is assumed that norm + n will not overflow.

1) Is it correct or does next_ get passed tail_ in some cases? Does it work with smaller integer types like uint32_t or uint16_t iff buffer size and n are restricted e.g. to 1/10 * maximum integer of these types.
2) If it is not correct then I would like to know the concrete case.
3) Is something else wrong or what can be improved? (I omitted the cacheline padding)

class msg_ctrl
{
public:
    inline msg_ctrl();

    inline int claim(size_t n, uint64_t& seq);
    inline int publish(size_t n, uint64_t seq);
    inline int tail(uint64_t t);

public:
    std::atomic<uint64_t> next_;
    std::atomic<uint64_t> head_;
    std::atomic<uint64_t> tail_;
};

// Implementation -----------------------------------------

msg_ctrl::msg_ctrl() : next_(2), head_(1), tail_(0)
{}

int msg_ctrl::claim(size_t n, uint64_t& seq)
{
    uint64_t const size = msg_buffer::size();
    if (n > 1024) // please do not try to reserve too much slots
        return -1;

    uint64_t curr = 0;
    do
    {
        curr = next_.load();

        uint64_t tail = tail_.load();
        uint64_t norm = curr - tail;
        uint64_t next = norm + n;

        if (next > size)
            std::this_thread::yield(); // todo: some wait strategy
        else if (next_.compare_exchange_weak(curr, curr + n))
            break;

    } while (true);

    seq = curr;
    return 0;
}

int msg_ctrl::publish(size_t n, uint64_t seq)
{
    uint64_t tmp = seq-1;
    uint64_t val = seq+n-1;
    while (!head_.compare_exchange_weak(tmp, val))
    {
        tmp = seq-1;
        std::this_thread::yield();
    }
    return 0;
}

int msg_ctrl::tail(uint64_t t)
{
    tail_.store(t);
    return 0;
}

Publishing to the ring buffer will look like:

size_t n = 15;
uint64_t seq = 0;
msg_ctrl->claim(n, seq);

//fill items in buffer
buffer[seq + 0] = an item
buffer[seq + 1] = an item
...
buffer[seq + n-1] = an item

msg_ctrl->publish(n, seq);
0

There are 0 best solutions below