I am implementing the producer-consumer problem. To implement this, we need to have std::condition_variable along with std::mutex to notify threads to wake up. Using these 2 primitives, the producer can notify to consumer and vice-versa to wake up. This is generally required to avoid thread starvation issues. But I am thinking does this issue really persist in the case of the multiprocessors system?
This question comes because I am implementing this using lock-free ring buffer and I don't want to use std::mutex and std::condition_variable at the producer and consumer sides. Since this queue can't have a data-race issue calling enqueue() and dequeue(). Below is the code.
template<typename MessageType>
class MessageProcessor
{
public:
~MessageProcessor()
{
stop();
if (workerThread_.joinable())
workerThread_.join();
}
bool postMessage(MessageType const &msg)
{
return queue_.enqueue(msg);
}
void registerHandler(std::function<void(MessageType)> handler, int32_t coreId=-1, std::string_view const &name="")
{
std::call_once(init_, [&](){
handler_ = std::move(handler);
workerThread_ = std::thread{&MessageProcessor::process, this};
if (!setAffinity(coreId, workerThread_))
LOG("Msg Processing thread couldn't be pinned to core: " << coreId);
else
LOG("Msg Processing thread pinned to core: " << coreId);
if (! name.empty())
pthread_setname_np(workerThread_.native_handle(), name.data());
});
}
void stop()
{
stop_ = true;
}
private:
void process() //This is a consumer, runs in a separate thread
{
while(!stop_.load(std::memory_order_acquire))
{
MessageType msg;
if (! queue_.dequeue(msg))
continue;
try
{
handler_(msg);
}
catch(std::exception const &ex)
{
LOG("Error while processing data: " << msg << ", Exception: " << ex.what());
}
catch(...)
{
LOG("UNKOWN Error while processing data: " << msg);
}
}
}
bool setAffinity(int32_t const coreId, std::thread &thread)
{
int cpuCoreCount = __sysconf(_GLIBCXX_USE_SC_NPROCESSORS_ONLN);
if (coreId < 0 || coreId >= cpuCoreCount)
return false;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(coreId, &cpuset);
pthread_t currentThread = thread.native_handle();
return pthread_setaffinity_np(currentThread, sizeof(cpu_set_t), &cpuset) == 0;
}
std::thread workerThread_;
std::atomic<bool> stop_{false};
MPMC_Circular_Queue<MessageType, 1024> queue_;
std::function<void(MessageType)> handler_{};
std::once_flag init_;
};
int main()
{
pthread_setname_np(pthread_self(), "MAIN");
MessageProcessor<int> processor;
processor.registerHandler([](int i){
LOG("Received value: " << i);
}, 2, "PROCESSOR");
std::thread t1([&]() { //Producer thread1
for (int i = 1; i <= 100000; i += 2)
{
LOG("Submitting value: " << i);
processor.postMessage(i);
}
});
pthread_setname_np(t1.native_handle(), "ODD ");
std::thread t2([&]() { //Producer thread2
for (int i = 2; i <= 100000; i += 2)
{
LOG("Submitting value: " << i);
processor.postMessage(i);
}
});
pthread_setname_np(t2.native_handle(), "EVEN");
for (int i = 1; i <= 100000; ++i)
{
LOG("Runing main thread: " << i);
}
t1.join();
t2.join();
return 0;
}
Can this code raise thread starvation issue in modern multiprocessors system? MPMC_Circular_Queue is a lock free bounded queue.