I'm trying to update a vector by using a thread pool, where each worker is on charge of updating certain portion of the vector. More concretely I split the vector into subsets with null intersection, where the number of sets is given by the number of cpus available to my computer (4).
Each thread worker will update:
std::array<int, arraySize> vec
If this array is declared private inside a class MultiThreadedUpdateOfVector, then the functionality doesn't work: each trade updates the corresponding portion of vec, but the changes are not picked by the next thread. So vec acts as if was a local variable to each thread.
This problem disappears if:
std::array<int, arraySize> vec
is declared before MultiThreadedUpdateOfVector.
Can you explain this unwanted behaviour?
Can you suggest a solution where std::array<int, arraySize> vec remains a member of MultiThreadedUpdateOfVector?
Thank you!
#include "stdafx.h"
#include <iostream>
#include <vector>
#include <chrono>
#include <thread>
#include <array>
#include "ThreadPool.h"
using namespace std;
const int block = 2;
const int arraySize = 8;
class MultiThreadedUpdateOfVector
{
public:
MultiThreadedUpdateOfVector()
{
}
bool setArray(const int& i, const int& j)
{
std::thread::id id = std::this_thread::get_id();
for (int kk = i; kk < j; ++kk)
{
vec[kk] = i * 10000 + j * 100 + kk;
}
return true;
}
void print()
{
for (unsigned kk = 0; kk < vec.size(); ++kk)
{
std::cout << kk << " " << vec[kk] << endl;
}
}
private:
std::array<int, arraySize> vec;
};
int main()
{
std::thread::id id = std::this_thread::get_id();
ThreadPool pool(4);
std::vector<std::future<bool> >results;
MultiThreadedUpdateOfVector h;
int begin = 0;
int end = block;
for (int i = 0; i < 4; ++i) {
results.push_back(pool.enqueue(&MultiThreadedUpdateOfVector::setArray, h, begin, end));
begin = end + 1;
end += block;
}
for (int i = 0; i < 4; ++i)
results[i].get();
h.print();
return 0;
}
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
int getTasksSize()
{
std::unique_lock<std::mutex> lock(this->queue_mutex_m);
std::thread::id id1 = std::this_thread::get_id();
return tasks_m.size();
}
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers_m;
// the task queue
std::queue< std::function<void()> > tasks_m;
// synchronization
std::mutex queue_mutex_m;
std::condition_variable condition_m;
bool stop_m;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop_m(false)
{
std::thread::id id = std::this_thread::get_id();
for (size_t i = 0; i < threads; ++i)
{
workers_m.emplace_back(
[this]
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex_m);
std::thread::id id1 = std::this_thread::get_id();
this->condition_m.wait(lock, [this]{ return this->stop_m || !this->tasks_m.empty(); });
std::thread::id id = std::this_thread::get_id();
if (this->stop_m && this->tasks_m.empty())
return;
task = std::move(this->tasks_m.front());
this->tasks_m.pop();
}
task();
}
}
);
}
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
std::thread::id id = std::this_thread::get_id();
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_m);
// don't allow enqueueing after stopping the pool
if (stop_m)
throw std::runtime_error("enqueue on stopped ThreadPool");
std::thread::id id = std::this_thread::get_id();
tasks_m.emplace([task](){ (*task)(); });
}
condition_m.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex_m);
stop_m = true;
}
condition_m.notify_all();
for (std::thread &worker : workers_m)
worker.join();
}
Isn't your
enqueuestoring a copy the arguments to invoke them?If you were using
threaddirectly, that would be the problem, and you would use a reference wrapper — passstd::ref(h)instead ofh— to the thread constructor so as to passhby reference.I assume the same thing should be done with your
ThreadPool. (and if that doesn't work, thatThreadPoolneeds to be redesigned so that it does work)