I am implementing the disruptor pattern for inter thread communication in C++ for multiple producers. In the implementation from LMAX the next(n)
method in MultiProducerSequencer.java
uses signed integers (okay it's Java) but also the C++ port (disruptor--) uses signed integers. After a very (very) long time overflow will result in undefined behavior.
Unsigned integers have multiple advantages:
- correct behavior on overflow
- no need for 64 bit integers
Here is my approach for claiming n slots (source is attached at the end): next_
is the index of next free slot that can be claimed, tail_
is the last free slot that can be claimed (will be updated somewhere else). n
is smaller than the buffer size. My approach is to normalize next and tail position for intermediate calculations by subtracting tail from next. Adding n
to normalized next norm
must be smaller than the buffer size to successfully claim the slots between next_
and next_+n
. It is assumed that norm + n
will not overflow.
1) Is it correct or does next_
get passed tail_
in some cases? Does it work with smaller integer types like uint32_t
or uint16_t
iff buffer size and n
are restricted e.g. to 1/10 * maximum integer of these types.
2) If it is not correct then I would like to know the concrete case.
3) Is something else wrong or what can be improved? (I omitted the cacheline padding)
class msg_ctrl
{
public:
inline msg_ctrl();
inline int claim(size_t n, uint64_t& seq);
inline int publish(size_t n, uint64_t seq);
inline int tail(uint64_t t);
public:
std::atomic<uint64_t> next_;
std::atomic<uint64_t> head_;
std::atomic<uint64_t> tail_;
};
// Implementation -----------------------------------------
msg_ctrl::msg_ctrl() : next_(2), head_(1), tail_(0)
{}
int msg_ctrl::claim(size_t n, uint64_t& seq)
{
uint64_t const size = msg_buffer::size();
if (n > 1024) // please do not try to reserve too much slots
return -1;
uint64_t curr = 0;
do
{
curr = next_.load();
uint64_t tail = tail_.load();
uint64_t norm = curr - tail;
uint64_t next = norm + n;
if (next > size)
std::this_thread::yield(); // todo: some wait strategy
else if (next_.compare_exchange_weak(curr, curr + n))
break;
} while (true);
seq = curr;
return 0;
}
int msg_ctrl::publish(size_t n, uint64_t seq)
{
uint64_t tmp = seq-1;
uint64_t val = seq+n-1;
while (!head_.compare_exchange_weak(tmp, val))
{
tmp = seq-1;
std::this_thread::yield();
}
return 0;
}
int msg_ctrl::tail(uint64_t t)
{
tail_.store(t);
return 0;
}
Publishing to the ring buffer will look like:
size_t n = 15;
uint64_t seq = 0;
msg_ctrl->claim(n, seq);
//fill items in buffer
buffer[seq + 0] = an item
buffer[seq + 1] = an item
...
buffer[seq + n-1] = an item
msg_ctrl->publish(n, seq);