I have this little simple queue where a one task read from a file into the queue and several tasks unpack the content. I works for a while, but eventually crashes because the queue is empty even though I in the line before check that the queue is not empty! (see the comment in the code)
#pragma once
#include <optional>
#include <future>
#include <queue>
template<class T, uint32_t MAX_SIZE>
class spmc_fifo_queue
{
public:
using optional_queue_pair = std::optional<T>;
bool put(optional_queue_pair&& p)
{
if (my_queue.size() >= MAX_SIZE) {
std::unique_lock my_lock(my_mutex);
my_cv_remove.wait(my_lock, [this] { return my_queue.size() < MAX_SIZE; });
}
std::lock_guard<std::mutex> my_lock(my_mutex);
my_queue.push(std::move(p));
my_cv_add.notify_one();
return true;
}
using optional_queue_pair = std::optional<T>;
optional_queue_pair get()
{
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
std::scoped_lock my_lock(my_mutex);
optional_queue_pair ret = std::move(my_queue.front()); //some times my_queue.size() == 0 why????
my_queue.pop();
my_cv_remove.notify_one();
return ret;
}
private:
using optional_queue_pair = std::optional<T>;
std::queue<optional_queue_pair> my_queue;
std::mutex my_mutex;
std::condition_variable my_cv_add;
std::condition_variable my_cv_remove;
};
I have tried to guard from this, but apparently I do not understand the details of the mutex locking!
condition variables may awake multiple threads when you use
notify_onespurious wakeupyou need to check again that the queue is not empty after you lock the mutex, two threads may have awaken and are waiting on the lock, only the first one will find something in the queue, the other will find it empty, you also need to put the whole thing in a
while (true)because the queue is not allowed to fail.for production i would probably use something like boost lockfree queue which is multiple producer-multiple consumer lock-free queue and extensively tested instead of rolling your own (slower) implementation of a multi-consumer queue.