I try to build a fair lock among threads. Each thread runs a for
loop to do ++iCounter
. Each thread can do ++iCounter
one time, then issue cv.notify_one()
to wake up another thread using cv.wait(ul, lambda{check condition_A})
to wait the lock.
I use a while(condition_A){}
spin in front of the cv.wait(ul, lambda{check condition_A})
to avoid the thread wake up itself and get the lock again. The condition_A
is iThrID == atom_prev_iThrID && atom_activeThr_counter != 1
.
In order to double check, so I put a cv.wait(ul, lambda{check condition_A})
to check the condition_A
again. If it happens in the cv.wait()
, then print out the message.
In theory, while(condition_A){}
should filter the case, so cv.wait(ul, lambda{check condition_A})
should not be triggered. However, it does be triggered and printed out the message. The message is showed as follows.
Why does the spin check while(condition_A){}
not avoid the cv.wait(ul, lambda{check condition_A})
to receive the cv.notify_one()?
The example code is as follows:
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <mutex>
#include <memory>
#include <condition_variable>
std::mutex mtx_counter;
std::condition_variable cv;
const int numOfThr {1000};
const int numOfForLoop {100};
std::atomic<int> atom_prev_iThrID {-1};
std::atomic<int> atom_activeThr_counter {0};
std::atomic<int> atom_iCounter{0};
int iCounter {0};
void thrf_fairLock ( const int iThrID )
{
++atom_activeThr_counter;
for ( int i = 0; i < numOfForLoop; ++i )
{
//Check condition_A
//If iThrID thread is the previous user and not the last thread,
//it will spin to avoid to unique_lock(mtx)
//till another thread unique_lock(mtx) and assigned atom_prev_iThrID.
while ( iThrID == atom_prev_iThrID //iThrID thread is the previous user
&& atom_activeThr_counter != 1 ) // iThrID is the last thread
{} //spin
std::unique_lock ul ( mtx_counter );
cv.wait ( ul, [&]()
{
//Using lambda function to check the condition_A in cv.wait() again.
//This condition_A should not happen,
//because the previous spin has already checked the same condition.
bool b { ( iThrID == atom_prev_iThrID //iThrID thread is the previous user
&& atom_activeThr_counter != 1 )}; // iThrID is the last thread
if ( b )
{
//I am the previous user, so I have to notify another thread to lock.
cv.notify_one();
//If condition_A happens, print out the message.
//Although it should not happen.
std::cout << iThrID << " : "
<< atom_prev_iThrID << " : "
<< atom_activeThr_counter << " |\n";
}
return !b;
} );
//got critical zone
++iCounter;
++atom_iCounter;
atom_prev_iThrID = iThrID;
ul.unlock();
//released critical zone
cv.notify_one();
}
--atom_activeThr_counter;
}
int main()
{
std::cout << "Start testing fair lock ..." << "\n\n";
std::cout << "print out message in cv.wait(): \n";
std::vector<std::jthread> thrf_fairLockVec ( numOfThr );
for ( int iThrID = 0; iThrID < numOfThr; ++iThrID )
{
thrf_fairLockVec[iThrID] = std::jthread ( thrf_fairLock, iThrID );
}
for ( auto& thr : thrf_fairLockVec )
{
if ( thr.joinable() )
thr.join();
}
std::cout << "\n\n";
std::cout << "numOfThr * numOfForLoop: " << numOfThr* numOfForLoop << "\n";
std::cout << "\n";
std::cout << "iCounter: \t" << iCounter << "\n";
std::cout << "atom_iCounter: \t" << atom_iCounter << "\n";
std::cout << "\n\n";
std::cout << "Test is done." << "\n";
return 0;
}
The output is as follows:
Start testing fair lock ...
print out message in cv.wait():
343 : 343 : 13 |
422 : 422 : 14 |
538 : 538 : 11 |
699 : 699 : 13 |
843 : 843 : 11 |
numOfThr * numOfForLoop: 100000
iCounter: 100000
atom_iCounter: 100000
Test is done.
I got the answer. The condition_A is
iThrID == atom_prev_iThrID && atom_activeThr_counter != 1
, and the key point here isatom_activeThr_counter != 1
instead ofiThrID == atom_prev_iThrID
. Those threads are not kick-off at the same time. They kick-off sequentially. The for loop is only run 100 times. It is so short that sometimes all active threads have done the job and leave, so theatom_activeThr_counter == 1
at the moment. When the while spin sawatom_activeThr_counter == 1
, it stop spin. When it move to nextcv.wait()
, more threads are loaded in, andatom_activeThr_counter != 1
, socv.wait()
suffersatom_activeThr_counter != 1
again.In order to prove this assumption, I put
barrier_fairLock.arrive_and_wait();
in front of each threads to let thread kick off together, andcv.wait()
stop sufferingatom_activeThr_counter != 1
.By the way, let me share my core design idea here. In my experimentation, I noticed that
cv.notify_one()
is very easily wake up the original thread, so thecv.wait()
have to process the wake up, issues anothercv.notify_one()
and go back to wait. This procedure seems take some time. In order to avoid this situation, so I put a while spin to inspect the condition_A. On my system the onlycv.wait()
version runs 23 seconds, andwhile spin + cv.wait()
version can runs 8~10 seconds.while spin + cv.wait()
version example code is as follows:while spin + cv.wait()
version result:cv.wait()
version example code is as follows:cv.wait()
version results:In advance, I noticed that the
cv.notify_one()
may be dropped in some reason. I guess that it is dropped becausecv.notify_one()
arrived earilier thancv.wait()
in another thread. At this moment, there is no threads are in thecv.wait()
state. In order to avoid this situation, I usecv.wait_for(100ms)
to replacecv.wait()
withwhile spin
aswhile spin + cv.wait_for(100ms)
version. As following result, it improves the execution time ofwhile spin + cv.wait()
from 8~10 seconds to 2.5~3.5 seconds.while spin + cv.wait_for(100ms)
version example code is as follows:while spin + cv.wait_for(100ms)
version result is as follows: