Thread starvation in multiprocessors system

200 Views Asked by At

I am implementing the producer-consumer problem. To implement this, we need to have std::condition_variable along with std::mutex to notify threads to wake up. Using these 2 primitives, the producer can notify to consumer and vice-versa to wake up. This is generally required to avoid thread starvation issues. But I am thinking does this issue really persist in the case of the multiprocessors system?

This question comes because I am implementing this using lock-free ring buffer and I don't want to use std::mutex and std::condition_variable at the producer and consumer sides. Since this queue can't have a data-race issue calling enqueue() and dequeue(). Below is the code.

template<typename MessageType>
class MessageProcessor
{
public:
    ~MessageProcessor()
    {
        stop();

        if (workerThread_.joinable())
            workerThread_.join();
    }

    bool postMessage(MessageType const &msg)
    {
        return queue_.enqueue(msg);
    }

    void registerHandler(std::function<void(MessageType)> handler, int32_t coreId=-1, std::string_view const &name="")
    {
        std::call_once(init_, [&](){
            handler_ = std::move(handler);
            workerThread_ = std::thread{&MessageProcessor::process, this};

            if (!setAffinity(coreId, workerThread_))
                LOG("Msg Processing thread couldn't be pinned to core: " << coreId);
            else
                LOG("Msg Processing thread pinned to core: " << coreId);

            if (! name.empty())
                pthread_setname_np(workerThread_.native_handle(), name.data());
        });
    }

    void stop()
    {
        stop_ = true;
    }

private:
    void process() //This is a consumer, runs in a separate thread
    {
        while(!stop_.load(std::memory_order_acquire))
        {
            MessageType msg;

            if (! queue_.dequeue(msg))
                continue;

            try
            {
                handler_(msg);
            }
            catch(std::exception const &ex)
            {
                LOG("Error while processing data: " << msg << ", Exception: " << ex.what());
            }
            catch(...)
            {
                LOG("UNKOWN Error while processing data: " << msg);
            }
        }
    }

    bool setAffinity(int32_t const coreId, std::thread &thread)
    {
        int cpuCoreCount = __sysconf(_GLIBCXX_USE_SC_NPROCESSORS_ONLN);

        if (coreId < 0 || coreId >= cpuCoreCount)
            return false;

        cpu_set_t cpuset;

        CPU_ZERO(&cpuset);
        CPU_SET(coreId, &cpuset);

        pthread_t currentThread = thread.native_handle();

        return pthread_setaffinity_np(currentThread, sizeof(cpu_set_t), &cpuset) == 0;
    }

    std::thread workerThread_;
    std::atomic<bool> stop_{false};
    MPMC_Circular_Queue<MessageType, 1024> queue_;
    std::function<void(MessageType)> handler_{};
    std::once_flag init_;
};

int main()
{
    pthread_setname_np(pthread_self(), "MAIN");

    MessageProcessor<int> processor;

    processor.registerHandler([](int i){
        LOG("Received value: " << i);
    }, 2, "PROCESSOR");

    std::thread t1([&]() { //Producer thread1
        for (int i = 1; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t1.native_handle(), "ODD ");

    std::thread t2([&]() { //Producer thread2
        for (int i = 2; i <= 100000; i += 2)
        {
            LOG("Submitting value: " << i);
            processor.postMessage(i);
        }
    });

    pthread_setname_np(t2.native_handle(), "EVEN");

    for (int i = 1; i <= 100000; ++i)
    {
        LOG("Runing main thread: " << i);
    }

    t1.join();
    t2.join();

    return 0;
}

Can this code raise thread starvation issue in modern multiprocessors system? MPMC_Circular_Queue is a lock free bounded queue.

0

There are 0 best solutions below