Lock-free multiple producer multiple consumer in C++

1.7k Views Asked by At

I have to program a multiple producer-consumer system in C++, but I'm lost trying to put together each part of the model (threads with its correct buffer). The basic functioning of the model is: I have an initial thread that executes a function. This returned results need to be put in an undetermined number of buffers, because each elements that the function proccess is different and it needs to be treated in a single thread. Then, with the data stored in the buffers, another n threads need to get the data of this buffers to do another function, and the return of this need to be put in some buffers again.

At the moment I have got this buffer structure created:

template <typename T>
class buffer {
public:
  atomic_buffer(int n);
  int bufSize() const noexcept;
  bool bufEmpty() const noexcept;
  bool full() const noexcept;
  ~atomic_buffer() = default;


  void put(const T & x, bool last) noexcept;
  std::pair<bool,T> get() noexcept;

private:
  int next_pos(int p) const noexcept;

private:
  struct item {
    bool last;
    T value;
  };
  const int size_;
  std::unique_ptr<item[]> buf_;
  alignas(64) std::atomic<int> nextRd_ {0};
  alignas(64) std::atomic<int> nextWrt_ {0};
};

I've also created a vectorstructure which stores a collection un buffers, in order to satisfy the undetermined number of threads necessity.

std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;

for(int i=0; i<n; i++){     
v1.push_back(std::unique_ptr<locked_buffer<std::pair<int,std::vector<std::vector<unsigned char>>>>> (new locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>(aux)));  
}

Edit:

Flowchart of the model

1

There are 1 best solutions below

2
On BEST ANSWER

Without knowing more context, this looks like an application for a standard thread pool. You have different tasks that are enqueued to a synchronized queue (like the buffer class you have there). Each worker thread of the thread pool polls this queue and processes one task each time (by executing a run() method for example). They write the results back into another synchronized queue.

Each worker thread has an own thread-local pair of input and output buffers. They don't need synchronization because they are only accessed from within the owner thread itself.

enter image description here

Edit: Actually, I think this can be simplified a lot: Just use a thread pool and one synchronized queue. The worker threads can enqueue new tasks directly into the queue. Each of your threads in the drawing would correspond to one type of task and implement a common Task interface. You don't need mutiple buffers. You can use polymorphism and put everything in one buffer.

Edit 2 - Explanation of thread pools:
A thread pool is just a concept. Forget about the pooling aspect, use a fixed number of threads. The main idea is: Instead of having several threads with a specific function, have N threads that can process any kind of task. Where N is the number of cores of the CPU.

You can transform this

enter image description here

into

enter image description here

The worker thread does something like the following. Note that this is simplified, but you should get the idea.

void Thread::run(buffer<Task*>& queue) {
    while(true) {
        Task* task = queue.get();
        if(task)
            task->execute();
        while(queue.isEmpty())
            waitUntilQueueHasElement();
    }
}

And your tasks implement a common interface so you can put Task* pointers into a single queue:

struct Task {
    virtual void execute() = 0;
}

struct Task1 : public Task {
    virtual void execute() override {
        A();
        B1();
        C();
    }
}

...

Also, do yourself a favour and use typedefs ;)

`std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;`  

becomes

typedef std::vector<std::vector<unsigned char>> vector2D_uchar;
typedef std::pair<int, vector2D_uchar> int_vec_pair;
typedef std::unique_ptr<locked_buffer<int_vec_pair>> locked_buffer_ptr;
std::vector<locked_buffer_ptr> v1;