What is wrong with this C++ multi-threaded program?

118 Views Asked by At

I write a multi-threaded C++ program using std::thread that looks like the following:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;

const int threadNum = 4;
mutex mt[threadNum];
condition_variable cv[threadNum];
thread threadList[threadNum];
bool threadWork[threadNum];

void work(int id) {
    while (true) {
        unique_lock<mutex> lck(mt[id]);
        cv[id].wait(lck, [&]() { return threadWork[id]; }); // wait for incoming tasks
        
        // do something
        
        threadWork[id] = false;
        cv[id].notify_all();
    }
}
int main() {
    for (int i = 0; i < threadNum; i ++) {
        threadWork[i] = false;
        threadList[i] = thread(work, i);
    }

    while (true) {
        for (int i = 0; i < threadNum; i ++) {
            // allocate tasks for each threads
            threadWork[i] = true;
            cv[i].notify_all();
        }
        for (int i = 0; i < threadNum; i ++) {
            // wait until all tasks finish
            unique_lock<mutex> lck(mt[i]);
            cv[i].wait(lck, [&]() { return !threadWork[i]; });
            cout << "Finish thread " << i << endl;
        }

        // do something
    }

    for (int i = 0; i < threadNum; i ++)
        threadList[i].join();
    return 0;
}

In iterations of the program, the main thread and sub threads will execute alternatively. The main thread will first allocate tasks for multiple sub threads. After all threads finish their tasks, the main thread will aggregate the information and do some other computation.

However, during execution, the program will crash or get stuck in dead locks after a few iterations randomly. I have no idea why this happens. What the problem with it?

My environment: ubuntu 22.04, g++ 11.4

1

There are 1 best solutions below

0
Michaël Roy On

Coments in code below:

//...
mutex mt[threadNum];           // Mutexes are for synchronizing 
                               // between threads.  Having 1 mutex
                               // per thread is not proper the way
                               // to guard access to data.
                               // try having 1 mutex per block of 
                               // data that you want guarded instead.

condition_variable cv[threadNum];  // same applies for the condition variables
                                   // you want to indicate that the DATA 
                                   // is ready.
// ... 
// The rest of the code will need a bit of reordering.

You should try, for your experiment something a bit easier, let's say, with only one block of guarded data, to start with. Here's an example below.

Note in the code below that the worker threads wait for the one unique condition variable telling them that something needs done.

I've also added a simple mechanism to exit gracefully from the program once the work is done. That's as important as the rest, note the use of atomic<>, which guarantees that all cores will see the change in value as soon as it's made.

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <atomic>

std::mutex my_data_mutex;
std::condition_variable my_data_cv;
std::vector<int> my_data;
constexpr size_t threadNum = 4;
std::atomic<bool> halt = false;


void work(int id) {
    for(;;)
    {
        std::unique_lock<std::mutex> _(my_data_mutex);
        my_data_cv.wait(_);  // wait for incoming tasks

        if (halt)
            return;
        // do something
        my_data.push_back(my_data.back() + id);
    }
}

int main() {

    std::vector<std::thread> tasks;
    for (int i = 0; i < threadNum; i++) {
        tasks.emplace_back(work, i);
    }

    while(!halt)
    {
        std::unique_lock<std::mutex> _(my_data_mutex);

        my_data.push_back(0);             // do some silly job
                                          // note tht the worker threads
                                          // would seg fault when 
                                          // the array is empty (try it!)
                                          // thank god, they're waiting, for now.

        if (my_data.size() > 1'000'000)   // exit when job is done.
            halt = true;                  // tell all that the jobn is done

        my_data_cv.notify_all();          // tell all threads to do their silly part
    }

    for (auto& t : tasks)
        t.join();

   return 0;
}