Why does the spin check while(condition){} not avoid the std::condition_variable::wait(lock, condition)?

301 Views Asked by At

I try to build a fair lock among threads. Each thread runs a for loop to do ++iCounter. Each thread can do ++iCounter one time, then issue cv.notify_one() to wake up another thread using cv.wait(ul, lambda{check condition_A}) to wait the lock.

I use a while(condition_A){} spin in front of the cv.wait(ul, lambda{check condition_A}) to avoid the thread wake up itself and get the lock again. The condition_A is iThrID == atom_prev_iThrID && atom_activeThr_counter != 1.

In order to double check, so I put a cv.wait(ul, lambda{check condition_A}) to check the condition_A again. If it happens in the cv.wait(), then print out the message.

In theory, while(condition_A){} should filter the case, so cv.wait(ul, lambda{check condition_A}) should not be triggered. However, it does be triggered and printed out the message. The message is showed as follows.

Why does the spin check while(condition_A){} not avoid the cv.wait(ul, lambda{check condition_A}) to receive the cv.notify_one()?

The example code is as follows:

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <mutex>
#include <memory>
#include <condition_variable>


std::mutex mtx_counter;
std::condition_variable cv;

const int numOfThr {1000};
const int numOfForLoop {100};

std::atomic<int> atom_prev_iThrID {-1};
std::atomic<int> atom_activeThr_counter {0};


std::atomic<int> atom_iCounter{0};
int iCounter {0};


void thrf_fairLock ( const int iThrID )
{
    ++atom_activeThr_counter;

    for ( int i = 0; i < numOfForLoop; ++i )
    {
        //Check condition_A
        //If iThrID thread is the previous user and not the last thread,
        //it will spin to avoid to unique_lock(mtx)
        //till another thread unique_lock(mtx) and assigned atom_prev_iThrID.

        while ( iThrID == atom_prev_iThrID          //iThrID thread is the previous user
                && atom_activeThr_counter != 1 )    // iThrID is the last thread
        {} //spin

        std::unique_lock ul ( mtx_counter );
        cv.wait ( ul, [&]()
        {
            //Using lambda function to check the condition_A in cv.wait() again.
            //This condition_A should not happen,
            //because the previous spin has already checked the same condition.

            bool b { ( iThrID == atom_prev_iThrID     //iThrID thread is the previous user
             && atom_activeThr_counter != 1 )}; // iThrID is the last thread

            if ( b )
            {
                //I am the previous user, so I have to notify another thread to lock.
                cv.notify_one();

                //If condition_A happens, print out the message.
                //Although it should not happen.
                std::cout << iThrID << " : "
                          << atom_prev_iThrID << " : "
                          << atom_activeThr_counter << " |\n";
            }

            return !b;
        } );

        //got critical zone
        ++iCounter;
        ++atom_iCounter;

        atom_prev_iThrID = iThrID;

        ul.unlock();

        //released critical zone
        cv.notify_one();

    }

    --atom_activeThr_counter;
}



int main()
{
    std::cout << "Start testing fair lock ..." << "\n\n";
    std::cout << "print out message in cv.wait(): \n";

    std::vector<std::jthread> thrf_fairLockVec ( numOfThr );

    for ( int iThrID = 0; iThrID < numOfThr; ++iThrID )
    {
        thrf_fairLockVec[iThrID] = std::jthread ( thrf_fairLock, iThrID );
    }

    for ( auto& thr : thrf_fairLockVec )
    {
        if ( thr.joinable() )
            thr.join();
    }

    std::cout << "\n\n";
    std::cout << "numOfThr * numOfForLoop: " << numOfThr* numOfForLoop << "\n";

    std::cout << "\n";
    std::cout << "iCounter: \t" << iCounter << "\n";
    std::cout << "atom_iCounter: \t" << atom_iCounter << "\n";

    std::cout << "\n\n";
    std::cout << "Test is done." << "\n";

    return 0;
}

The output is as follows:

Start testing fair lock ...

print out message in cv.wait():
343 : 343 : 13 |
422 : 422 : 14 |
538 : 538 : 11 |
699 : 699 : 13 |
843 : 843 : 11 |


numOfThr * numOfForLoop: 100000

iCounter:       100000
atom_iCounter:  100000


Test is done.
1

There are 1 best solutions below

0
On

I got the answer. The condition_A is iThrID == atom_prev_iThrID && atom_activeThr_counter != 1, and the key point here is atom_activeThr_counter != 1 instead of iThrID == atom_prev_iThrID. Those threads are not kick-off at the same time. They kick-off sequentially. The for loop is only run 100 times. It is so short that sometimes all active threads have done the job and leave, so the atom_activeThr_counter == 1 at the moment. When the while spin saw atom_activeThr_counter == 1, it stop spin. When it move to next cv.wait(), more threads are loaded in, and atom_activeThr_counter != 1, so cv.wait() suffers atom_activeThr_counter != 1 again.

In order to prove this assumption, I put barrier_fairLock.arrive_and_wait(); in front of each threads to let thread kick off together, and cv.wait() stop suffering atom_activeThr_counter != 1.

By the way, let me share my core design idea here. In my experimentation, I noticed that cv.notify_one() is very easily wake up the original thread, so the cv.wait() have to process the wake up, issues another cv.notify_one() and go back to wait. This procedure seems take some time. In order to avoid this situation, so I put a while spin to inspect the condition_A. On my system the only cv.wait() version runs 23 seconds, and while spin + cv.wait() version can runs 8~10 seconds.

while spin + cv.wait() version example code is as follows:

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <mutex>
#include <memory>
#include <condition_variable>
#include <barrier>


std::mutex mtx_counter;
std::condition_variable cv;

const int numOfThr {10000};
const int numOfForLoop {100};

std::atomic<int> atom_prev_iThrID {-1};
std::atomic<int> atom_activeThr_counter {0};


std::atomic<int> atom_iCounter{0};
int iCounter {0};

std::barrier barrier_fairLock ( numOfThr, [] {std::cout << "Barrier open. Go go go!!!\n";} );

void thrf_fairLock ( const int iThrID )
{
    barrier_fairLock.arrive_and_wait();
    ++atom_activeThr_counter;

    for ( int i = 0; i < numOfForLoop; ++i )
    {
        //Check condition_A
        //If iThrID thread is the previous user and not the last thread,
        //it will spin to avoid to unique_lock(mtx)
        //till another thread unique_lock(mtx) and assigned atom_prev_iThrID.

        while ( iThrID == atom_prev_iThrID          //iThrID thread is the previous user
                && atom_activeThr_counter != 1 )    // iThrID is the last thread
        {}//spin

        std::unique_lock ul ( mtx_counter );
        cv.wait ( ul, [&]()
        {
            //Using lambda function to check the condition_A in cv.wait() again.
            //This condition_A should not happen,
            //because the previous spin has already checked the same condition.

            bool b { ( iThrID == atom_prev_iThrID     //iThrID thread is the previous user
                       && atom_activeThr_counter != 1 )}; // iThrID is the last thread

            if ( b )
            {
                //I am the previous user, so I have to notify another thread to lock.
                cv.notify_one();

                //If condition_A happens, print out the message.
                //Although it should not happen.
//              std::cout << iThrID << " : "
//                        << atom_prev_iThrID << " : "
//                        << atom_activeThr_counter << " |\n";
            }

            return !b;
        } );

        //got critical zone
        ++iCounter;
        ++atom_iCounter;

        atom_prev_iThrID = iThrID;

        ul.unlock();

        //released critical zone
        cv.notify_one();

    }

    --atom_activeThr_counter;
}



int main()
{
    std::cout << "Start testing fair lock ..." << "\n\n";
    std::cout << "print out message in cv.wait(): \n";

    std::vector<std::jthread> thrf_fairLockVec ( numOfThr );

    for ( int iThrID = 0; iThrID < numOfThr; ++iThrID )
    {
        thrf_fairLockVec[iThrID] = std::jthread ( thrf_fairLock, iThrID );
    }

    for ( auto& thr : thrf_fairLockVec )
    {
        if ( thr.joinable() )
            thr.join();
    }

    std::cout << "\n\n";
    std::cout << "numOfThr * numOfForLoop: " << numOfThr* numOfForLoop << "\n";

    std::cout << "\n";
    std::cout << "iCounter: \t" << iCounter << "\n";
    std::cout << "atom_iCounter: \t" << atom_iCounter << "\n";

    std::cout << "\n\n";
    std::cout << "Test is done." << "\n";

    return 0;
}

while spin + cv.wait() version result:

Start testing fair lock ...

print out message in cv.wait():
Barrier open. Go go go!!!


numOfThr * numOfForLoop: 1000000

iCounter:       1000000
atom_iCounter:  1000000


Test is done.

Process returned 0 (0x0)   execution time : 8.123 s

cv.wait() version example code is as follows:

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <mutex>
#include <memory>
#include <condition_variable>
#include <barrier>


std::mutex mtx_counter;
std::condition_variable cv;

const int numOfThr {10000};
const int numOfForLoop {100};

std::atomic<int> atom_prev_iThrID {-1};
std::atomic<int> atom_activeThr_counter {0};


std::atomic<int> atom_iCounter{0};
int iCounter {0};

std::barrier barrier_fairLock ( numOfThr, [] {std::cout << "Barrier open. Go go go!!!\n";} );

void thrf_fairLock ( const int iThrID )
{
    barrier_fairLock.arrive_and_wait();
    ++atom_activeThr_counter;

    for ( int i = 0; i < numOfForLoop; ++i )
    {
        //Check condition_A
        //If iThrID thread is the previous user and not the last thread,
        //it will spin to avoid to unique_lock(mtx)
        //till another thread unique_lock(mtx) and assigned atom_prev_iThrID.

//      while ( iThrID == atom_prev_iThrID          //iThrID thread is the previous user
//              && atom_activeThr_counter != 1 )    // iThrID is the last thread
//      {
////          std::this_thread::yield();
//      }//spin

        std::unique_lock ul ( mtx_counter );
        cv.wait ( ul, [&]()
        {
            //Using lambda function to check the condition_A in cv.wait() again.
            //This condition_A should not happen,
            //because the previous spin has already checked the same condition.

            bool b { ( iThrID == atom_prev_iThrID     //iThrID thread is the previous user
                       && atom_activeThr_counter != 1 )}; // iThrID is the last thread

            if ( b )
            {
                //I am the previous user, so I have to notify another thread to lock.
                cv.notify_one();

                //If condition_A happens, print out the message.
                //Although it should not happen.
//              std::cout << iThrID << " : "
//                        << atom_prev_iThrID << " : "
//                        << atom_activeThr_counter << " |\n";
            }

            return !b;
        } );

        //got critical zone
        ++iCounter;
        ++atom_iCounter;

        atom_prev_iThrID = iThrID;

        ul.unlock();

        //released critical zone
        cv.notify_one();

    }

    --atom_activeThr_counter;
}



int main()
{
    std::cout << "Start testing fair lock ..." << "\n\n";
    std::cout << "print out message in cv.wait(): \n";

    std::vector<std::jthread> thrf_fairLockVec ( numOfThr );

    for ( int iThrID = 0; iThrID < numOfThr; ++iThrID )
    {
        thrf_fairLockVec[iThrID] = std::jthread ( thrf_fairLock, iThrID );
    }

    for ( auto& thr : thrf_fairLockVec )
    {
        if ( thr.joinable() )
            thr.join();
    }

    std::cout << "\n\n";
    std::cout << "numOfThr * numOfForLoop: " << numOfThr* numOfForLoop << "\n";

    std::cout << "\n";
    std::cout << "iCounter: \t" << iCounter << "\n";
    std::cout << "atom_iCounter: \t" << atom_iCounter << "\n";

    std::cout << "\n\n";
    std::cout << "Test is done." << "\n";

    return 0;
}


cv.wait() version results:

Start testing fair lock ...

print out message in cv.wait():
Barrier open. Go go go!!!


numOfThr * numOfForLoop: 1000000

iCounter:       1000000
atom_iCounter:  1000000


Test is done.

Process returned 0 (0x0)   execution time : 24.013 s

In advance, I noticed that the cv.notify_one() may be dropped in some reason. I guess that it is dropped because cv.notify_one() arrived earilier than cv.wait() in another thread. At this moment, there is no threads are in the cv.wait() state. In order to avoid this situation, I use cv.wait_for(100ms) to replace cv.wait() with while spin as while spin + cv.wait_for(100ms) version. As following result, it improves the execution time of while spin + cv.wait() from 8~10 seconds to 2.5~3.5 seconds.

while spin + cv.wait_for(100ms) version example code is as follows:

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <mutex>
#include <memory>
#include <condition_variable>
#include <barrier>
#include <numeric>


using namespace std::literals;

std::mutex mtx_counter;
std::condition_variable cv;

const int numOfThr {10000};
const int numOfForLoop {100};

std::atomic<int> atom_prev_iThrID {-1};
std::atomic<int> atom_activeThr_counter {0};


std::atomic<int> atom_iCounter{0};
int iCounter {0};

std::barrier barrier_fairLock ( numOfThr, [] {std::cout << "Barrier open. Go go go!!!\n";} );

void thrf_fairLock ( const int iThrID )
{
    barrier_fairLock.arrive_and_wait();
    ++atom_activeThr_counter;

    for ( int i = 0; i < numOfForLoop; ++i )
    {
//      //Check condition_A
//      //If iThrID thread is the previous user and not the last thread,
//      //it will spin to avoid to unique_lock(mtx)
//      //till another thread unique_lock(mtx) and assigned atom_prev_iThrID.


        while ( iThrID == atom_prev_iThrID          //iThrID thread is the previous user
                && atom_activeThr_counter != 1 )    // iThrID is the last thread
        {
          std::this_thread::yield();

        }//spin


        std::unique_lock ul ( mtx_counter );
        
        //use cv.wait_for to avoid the cv.notify_one() arrived earilier than cv.wait(),
        //so the incoming cv.wait() have to wait longer.
        cv.wait_for ( ul, 100ms, [&]()      
//        cv.wait ( ul, [&]()
        {
            //Using lambda function to check the condition_A in cv.wait() again.
            //This condition_A should not happen,
            //because the previous spin has already checked the same condition.

            bool b { ( iThrID == atom_prev_iThrID     //iThrID thread is the previous user
                       && atom_activeThr_counter != 1 )}; // iThrID is the last thread

            if ( b )
            {
                //I am the previous user, so I have to notify another thread to lock.
                cv.notify_one();

                //If condition_A happens, print out the message.
                //Although it should not happen.

                std::cout << iThrID << " : "
                          << atom_prev_iThrID << " : "
                          << atom_activeThr_counter << " |\n";
            }

            return !b;
        } );

        //got critical zone
        atom_prev_iThrID = iThrID;

        ++iCounter;

        ++atom_iCounter;

        ul.unlock();

        //released critical zone
        cv.notify_one();

    }

    --atom_activeThr_counter;

}


int main()
{

    std::cout << "Start testing fair lock ..." << "\n\n";
    std::cout << "print out message in cv.wait(): \n";

    std::vector<std::jthread> thrf_fairLockVec ( numOfThr );

    for ( int iThrID = 0; iThrID < numOfThr; ++iThrID )
    {
        thrf_fairLockVec[iThrID] = std::jthread ( thrf_fairLock, iThrID );
    }

    for ( auto& thr : thrf_fairLockVec )
    {
        if ( thr.joinable() )
            thr.join();
    }

    std::cout << "\n\n";
    std::cout << "numOfThr * numOfForLoop: " << numOfThr* numOfForLoop << "\n";

    std::cout << "\n";
    std::cout << "iCounter: \t" << iCounter << "\n";
    std::cout << "atom_iCounter: \t" << atom_iCounter << "\n";

    std::cout << "\n\n";
    std::cout << "Test is done." << "\n";

    return 0;
}

while spin + cv.wait_for(100ms) version result is as follows:

Start testing fair lock ...

print out message in cv.wait():
Barrier open. Go go go!!!


numOfThr * numOfForLoop: 1000000

iCounter:       1000000
atom_iCounter:  1000000


Test is done.

Process returned 0 (0x0)   execution time : 2.688 s