How many conditoinal variables should be used in producer-consumer problem?

121 Views Asked by At

I am currently learning multi-threading in C++. I have a question about the conditional variable. If I have such code:

std::condition_variable cvS;
std::condition_variable cvR;
std::condition_variable cv;
std::mutex gMtx;
int countm = 0;
void SenderS()
{
    std::unique_lock<std::mutex> lck(gMtx);
    while(countm >= 5){
        std::cout << std::this_thread::get_id() <<"exceedin S" << std::endl;
        cv.wait(lck); //or cvS.wait(lck);
    }
    
    countm++;
    std::cout<< std::this_thread::get_id() << "S"<< countm << std::endl;
    lck.unlock();
    cv.notify_one();  //or cvR.notify_one();
}

void ReceiverS()
{
    std::unique_lock<std::mutex> lck(gMtx);
    while(countm <= 0){
        std::cout << std::this_thread::get_id() <<"exceedin R" << std::endl;
        cv.wait(lck); //or cvR.wait(lck);
    }
    countm--;
    std::cout << std::this_thread::get_id() <<"R" << countm << std::endl;
    lck.unlock();
    cv.notify_one();  //or cvS.notify_one();
}

For this case, is there any difference between using one or two conditional variables? Generally, for the producer-consumer model, should I use one or two conditional variables?

Also, will cvR.notify_one() only notify the thread that did cvR.wait()?

3

There are 3 best solutions below

0
On

Based on my personal analysis, if using a single condition variable, should use notify_all() to wake up all waiting threads in order to avoid waking up the wrong thread. If using two condition variables, use notify_one() to wake up one thread of "the other side" should be fine. I don't know if it is a correct rule.

1
On

This is the answer I gave to this question, which I think also applies here. I think you need either two condition variables or a single atomic flag.

Ping-Pong with mutex and two condition variables

This is the canonical ping-pong using a mutex and condition variables. Note that 1) you need two condition variables to make ping-pong work and 2) you have to be careful about placing the output statements in a block where the lock is still held. Your code is close.

#include <iostream>
#include <condition_variable>
#include <atomic>
#include <thread>

class PingPong {
public:
    PingPong() {
        t0_ = std::thread(&PingPong::ping, this);
        t1_ = std::thread(&PingPong::pong, this);
    }

    ~PingPong() {
        if (t0_.joinable())
            t0_.join();
        if (t1_.joinable())
            t1_.join();
    }

    void ping() {

        while(counter <= 20) {
            {
                std::unique_lock<std::mutex> lck(mutex_);
                cv0_.wait(lck, [this]{ return ready_ == false; });
                ready_ = true;
                std::cout << "ping counter: " << counter << std::endl;
            }
            ++counter;
            cv1_.notify_one();
        }
    }

    void pong() {

        while(counter < 20) {
            {
                std::unique_lock<std::mutex> lck(mutex_);
                cv1_.wait(lck, [this]{ return ready_ == true; });
                ready_ = false;
                std::cout << "pong counter: " << counter << std::endl;
            }
            cv0_.notify_one();
        }
    }

private:
    bool ready_{false};
    std::mutex mutex_;
    std::condition_variable cv0_, cv1_;
    std::atomic<int> counter{};
    std::thread t0_, t1_;
};

int main(){
    PingPong p{};
}

This should result in the following output.

ping counter: 0
pong counter: 1
ping counter: 1
pong counter: 2
ping counter: 2
pong counter: 3
ping counter: 3
pong counter: 4
ping counter: 4
pong counter: 5
ping counter: 5
pong counter: 6
ping counter: 6
pong counter: 7
ping counter: 7
pong counter: 8
ping counter: 8
pong counter: 9
ping counter: 9
...

Ping-Pong with single atomic flag

Depending on your platform, it may be more performant (and a little simpler to grok) to use an atomic flag instead of condition variables. This produces the same output as above.

class PingPongAtomicFlag {
public:
    PingPongAtomicFlag() {
        t0_ = std::thread([this]() { ping(); });
        t1_ = std::thread([this]() { pong(); });
    }

    ~PingPongAtomicFlag() {
        if (t0_.joinable())
            t0_.join();
        if (t1_.joinable())
            t1_.join();
    }

    void ping() {

        while(counter_ <= 20) {
            potato_.wait(true);
            std::cout << "ping counter: " << counter_ << std::endl;
            potato_.test_and_set();
            ++counter_;
            potato_.notify_one();
        }
    }

    void pong() {

        while(counter_ < 20) {
            potato_.wait(false);
            std::cout << "pong counter: " << counter_ << std::endl;
            potato_.clear();
            potato_.notify_one();
        }
    }

private:
    std::atomic_flag potato_;
    std::atomic<int> counter_{};
    std::thread t0_, t1_;
};
0
On

The following example uses the Ada programming language to implement a producer-consumer pattern. Ada's primary unit of modularity is the package. An Ada package has a specification, which defines the API for the package, and a body, which provides the implementation of all behaviors defined in the package.

This example uses a generic package. The generic parameter, named Capacity, defines the size of the bounded buffer for an instance of the package.

The package specification is:

-----------------------------------------------------------------------
-- Producer-consumer with bounded buffer
-----------------------------------------------------------------------
generic
   Capacity : Positive;
package Bounded_PC is
   task type Producer is
      entry set_id(Id : in Positive);
      entry Stop;
   end Producer;

   task type Consumer is
      entry set_id(Id : in Positive);
      entry Stop;
   end Consumer;

end Bounded_PC;

Positive is a language-defined subtype of Integer with a minimum value of 1. Thus, the generic parameter Capacity must be an integer value not less than 1.

Task entries are synchronized communication methods for a task. Entries implement a Rendezvous method of synchronization. The set_id entry passes a parameter named Id, which is of the subtype Positive, to the task. The Stop entry has no parameter. The Stop entry cause the task to terminate, as will be seen in the package body. Tasks also have specifications and bodies. In this example the task specifications reside within the package specification and the task bodies reside within the package body.

This package specification defines two task types. Many instances of a task type may be created. Each instance will be a separate task. A task is often implemented as a thread.

The package body is:

with Ada.Text_IO;     use Ada.Text_IO;
with Ada.Dispatching; use Ada.Dispatching;

package body Bounded_PC is
   subtype Index_T is Positive range 1 .. Capacity;
   type Buf_Array is array (Index_T) of Integer;

   ------------
   -- Buffer --
   ------------

   protected Buffer is
      entry Write (Item : in Integer);
      entry Read (Item : out Integer);
   private
      Buf         : Buf_Array;
      Write_Index : Index_T := 1;
      Read_Index  : Index_T := 1;
      Count       : Natural := 0;
   end Buffer;

   protected body Buffer is

      entry Write (Item : in Integer) when Count < Capacity is
      begin
         Buf (Write_Index) := Item;
         Write_Index       := (Write_Index mod Capacity) + 1;
         Count             := Count + 1;
      end Write;

      entry Read (Item : out Integer) when Count > 0 is
      begin
         Item       := Buf (Read_Index);
         Read_Index := (Read_Index mod Capacity) + 1;
         Count      := Count - 1;
      end Read;

   end Buffer;

   --------------
   -- Producer --
   --------------

   task body Producer is
      Value : Integer := 0;
      Me    : Positive;
   begin
      accept set_id (Id : in Positive) do
         Me := Id;
      end set_id;

      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Write (Value);
               Put_Line ("Producer" & Me'Image & " wrote" & Value'Image);
               Value := Value + 1;
            or
               delay 0.001;
               Put_Line ("Producer" & Me'Image & " is waiting ....");
            end select;
         end select;
      end loop;
   end Producer;

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      Value : Integer;
      Me    : Positive;
   begin
      accept set_id (Id : in Positive) do
         Me := Id;
      end set_id;

      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Read (Value);
               Put_Line ("Consumer" & Me'Image & " read" & Value'Image);
            or
               delay 0.001;
               Put_Line ("Consumer" & Me'Image & " is waiting ....");
            end select;
         end select;
      end loop;
   end Consumer;

end Bounded_PC;

Within the package body you find the declaration of a protected object named Buffer. The word protected has a different meaning in Ada than in C++. An Ada protected buffer is implicitly protected from race conditions. The protected object, like packages and tasks, has a specification and a body. The specification defines the API for the Buffer, as well as its private data members. The body defines the behavior of the Buffer object. Since the Buffer object is defined within the package body and not exposed in the package specification the Buffer object visibility is analogous to a private member of a C++ class.

The Capacity generic parameter is used in the package body to define an integer subtype of the predefined subtype Positive. The range of values for this subtype is the values 1 through the value passed to the generic parameter Capacity.

An array type named Buf_Array is declared. This array type is indexed by the subtype Index_T and contains elements of the type Integer.

The protected specification for Buffer declares two entries for the Buffer object. An entry has exclusive read-write access to the Buffer object subject to a condition specified in the protected body. This condition is analogous to a C++ condition variable. The Write entry passes an Integer value into the Buffer object. The Read entry passes an Integer value out of the Buffer object.

The private members of the Buffer object are an instance of Buff_Array named Buf, an instance of Index_T named Write_Index (initialized to 1), an instance of Index_T named Read_Index (initialized to 1) and an instance of the predefined subtype Natural (an integer with a minimum value of 0) named Count (initialized to 0).

The protected body of this example is most germane to this question. There are two entries in the Buffer object. Each entry has a condition (analogous to a C++ condition variable). The condition associated with the Write entry is defined as

when Count < Capacity

This condition specifies that the Write entry will execute when the condition is TRUE and will suspend the calling task when the condition is false. Thus, the calling task will suspend when attempting to write to a full buffer.

The condition associated with the Read entry is defined as

when Count > 0

Thus, the calling task will suspend when attempting to read from an empty buffer.

Tasks suspended on an entry call are placed in an implicit entry queue. The default ordering of the entry queue is FIFO. Thus, when a different task changes the condition suspending a task the next suspended in the entry queue is awakened and completes its entry call. There is no explicit "notify" call made by any task. The suspension and notification of tasks related to entry conditions is performed implicitly. The compiler writes the code for those behaviors for the programmer.

The package body also implements the task bodies for the Producer task type and the Consumer task type.

The producer task body is implemented as

   task body Producer is
      Value : Integer := 0;
      Me    : Positive;
   begin
      accept set_id (Id : in Positive) do
         Me := Id;
      end set_id;

      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Write (Value);
               Put_Line ("Producer" & Me'Image & " wrote" & Value'Image);
               Value := Value + 1;
            or
               delay 0.001;
               Put_Line ("Producer" & Me'Image & " is waiting ....");
            end select;
         end select;
      end loop;
   end Producer;

Each instance of the Producer task type has two local variables named Value and Me. Value is an Integer initialized to 0. Me is the variable which will hold the Id assigned when the set_id tasks entry is called. The set_id task entry is handled by the accept call

      accept set_id (Id : in Positive) do
         Me := Id;
      end set_id;

The accept clause above invokes the Ada Rendezvous behavior. The task entry is called by some other task, in this example it will be the root task in which the main procedure executes. The Rendezvous behavior causes the entry to be completed only when the calling task calls the entry and the called task accepts the entry. If the calling task calls the entry before the called task accepts the entry then the calling task is suspended in a task entry queue until the called task accepts the entry. If the called task accepts the entry before a calling task calls the entry then the called task will suspend until its entry is called. This behavior synchronizes the called task and the calling task at the point of completing the entry. Upon completion of the entry the calling task and the called task continue executing asynchronously.

The effect of the main task calling the producer instance's set_id task is to assign a value to the Me variable of the called producer instance.

The produce then enters a loop which is only exited when the Stop entry is completed. As you can guess above, the Rendezvous behavior of task entries would normally cause the called task to wait until the main task calls its Stop entry, effectively preventing the producer from producing anything. Ada has a syntax solution for this issue. The "select" clause creates a conditional accept call on the Stop entry. If no task is calling the Stop entry execution of the accept statement is abandoned and a nested select statement is used to attempt to write to the buffer. If the write to the buffer completes within 0.001 seconds the loop is repeated. If the write to the buffer does not complete within 0.001 seconds the Buffer.Write call is abandoned and the loop completes, allowing the Producer instance to respond to a Stop entry call even when the Buffer is full.

The Consumer task body behaves in a very similar manner to the Producer task body except that it reads a value from the Buffer object rather than writing a value to the Buffer object.

The main procedure for this program is:

with Bounded_PC;

procedure Main is
   package Int_Pck is new Bounded_Pc(10);
   use Int_Pck;

   P1 : Producer;
   P2 : Producer;
   C1 : Consumer;
   C2 : Consumer;
begin
   P1.Set_Id(1);
   P2.Set_Id(2);
   C1.Set_Id(1);
   C2.Set_Id(2);
   delay 0.02;
   P1.Stop;
   P2.Stop;
   delay 0.01;
   C1.Stop;
   C2.Stop;
end Main;

The Main procedure make an instance of the generic package named Bounded_PC, passing the value 10 as the capacity of the Buffer.

Two instances of the Producer task type are created named P1 and P2. Two instances of the Consumer task type are created named C1 and C2. All four task instances begin executing immediately.

The Main procedure calls the Set_Id entries for all four task instances. The Main procedure delays (sleeps) for 0.02 seconds then calls the Stop entries for P1 and P2. The Main procedure delays another 0.01 seconds and calls the Stop entries for C1 and C2.

An example execution of this program give the following output:

Producer 1 wrote 0
Consumer 2 read 0
Producer 1 wrote 1
Producer 1 wrote 2
Producer 1 wrote 3
Producer 1 wrote 4
Producer 1 wrote 5
Producer 1 wrote 6
Producer 1 wrote 7
Producer 1 wrote 8
Producer 1 wrote 9
Producer 1 wrote 10
Producer 2 wrote 0
Producer 1 wrote 11
Consumer 2 read 1
Consumer 2 read 2
Consumer 1 read 0
Producer 1 wrote 12
Producer 2 wrote 1
Producer 1 wrote 13
Consumer 2 read 3
Consumer 1 read 4
Consumer 2 read 5
Producer 2 wrote 2
Consumer 1 read 6
Consumer 1 read 8
Consumer 1 read 9
Consumer 1 read 10
Consumer 1 read 11
Consumer 1 read 1
Consumer 1 read 12
Consumer 1 read 13
Producer 1 wrote 14
Consumer 1 read 2
Producer 2 wrote 3
Producer 1 wrote 15
Consumer 2 read 7
Producer 1 wrote 16
Producer 1 wrote 17
Producer 1 wrote 18
Producer 1 wrote 19
Producer 1 wrote 20
Consumer 2 read 3
Consumer 2 read 15
Producer 1 wrote 21
Consumer 2 read 4
Consumer 2 read 16
Consumer 2 read 17
Consumer 2 read 18
Consumer 2 read 19
Consumer 2 read 20
Consumer 2 read 21
Producer 1 wrote 22
Producer 1 wrote 23
Producer 1 wrote 24
Producer 1 wrote 25
Consumer 1 read 14
Producer 1 wrote 26
Consumer 2 read 22
Consumer 2 read 24
Producer 2 wrote 4
Consumer 2 read 25
Consumer 2 read 26
Producer 2 wrote 5
Producer 1 wrote 27
Producer 1 wrote 28
Producer 1 wrote 29
Producer 1 wrote 30
Producer 2 wrote 6
Producer 2 wrote 7
Producer 2 wrote 8
Producer 2 wrote 9
Producer 2 wrote 10
Producer 1 wrote 31
Consumer 2 read 27
Consumer 1 read 23
Consumer 2 read 5
Producer 2 wrote 11
Consumer 1 read 28
Consumer 1 read 29
Consumer 1 read 30
Consumer 1 read 31
Consumer 1 read 7
Consumer 1 read 8
Producer 1 wrote 32
Consumer 1 read 9
Consumer 1 read 10
Consumer 1 read 11
Consumer 1 read 32
Consumer 1 read 12
Consumer 1 read 33
Producer 1 wrote 33
Consumer 2 read 6
Producer 1 wrote 34
Producer 2 wrote 12
Producer 1 wrote 35
Consumer 2 read 35
Consumer 2 read 13
Consumer 2 read 36
Producer 1 wrote 36
Producer 2 wrote 13
Producer 1 wrote 37
Producer 2 wrote 14
Producer 2 wrote 15
Consumer 2 read 37
Producer 1 wrote 38
Producer 1 wrote 39
Producer 1 wrote 40
Producer 1 wrote 41
Producer 1 wrote 42
Producer 1 wrote 43
Producer 1 wrote 44
Producer 1 wrote 45
Producer 2 wrote 16
Consumer 2 read 14
Consumer 2 read 15
Producer 1 wrote 46
Consumer 2 read 38
Producer 2 wrote 17
Consumer 2 read 16
Producer 1 wrote 47
Consumer 2 read 39
Producer 2 wrote 18
Consumer 2 read 40
Producer 1 wrote 48
Consumer 2 read 41
Producer 2 wrote 19
Consumer 2 read 42
Producer 1 wrote 49
Consumer 2 read 43
Producer 2 wrote 20
Consumer 2 read 44
Producer 1 wrote 50
Consumer 2 read 45
Consumer 1 read 34
Consumer 1 read 17
Consumer 1 read 47
Consumer 1 read 18
Consumer 1 read 48
Consumer 1 read 19
Consumer 2 read 46
Consumer 1 read 49
Consumer 1 read 50
Consumer 1 read 21
Consumer 1 read 51
Consumer 2 read 20
Producer 1 wrote 51
Producer 1 wrote 52
Producer 2 wrote 21
Producer 1 wrote 53
Consumer 2 read 53
Consumer 2 read 22
Consumer 2 read 54
Producer 1 wrote 54
Consumer 1 read 52
Consumer 2 read 55
Producer 1 wrote 55
Producer 1 wrote 56
Producer 2 wrote 22
Consumer 1 read 56
Producer 1 wrote 57
Producer 1 wrote 58
Producer 1 wrote 59
Producer 1 wrote 60
Producer 1 wrote 61
Producer 1 wrote 62
Consumer 2 read 57
Producer 1 wrote 63
Consumer 2 read 58
Consumer 2 read 59
Producer 1 wrote 64
Consumer 2 read 60
Consumer 2 read 61
Consumer 2 read 62
Consumer 2 read 63
Consumer 2 read 64
Consumer 2 read 65
Producer 1 wrote 65
Consumer 1 read 23
Producer 1 wrote 66
Producer 1 wrote 67
Consumer 2 read 66
Consumer 2 read 68
Producer 1 wrote 68
Producer 2 wrote 23
Consumer 1 read 67
Consumer 2 read 69
Producer 1 wrote 69
Consumer 1 read 24
Producer 2 wrote 24
Producer 1 wrote 70
Producer 2 wrote 25
Producer 2 wrote 26
Producer 2 wrote 27
Consumer 1 read 25
Consumer 1 read 71
Consumer 1 read 26
Consumer 1 read 27
Consumer 1 read 28
Producer 2 wrote 28
Producer 2 wrote 29
Producer 2 wrote 30
Producer 2 wrote 31
Producer 2 wrote 32
Producer 2 wrote 33
Producer 2 wrote 34
Producer 2 wrote 35
Producer 1 wrote 71
Producer 2 wrote 36
Producer 2 wrote 37
Producer 2 wrote 38
Producer 2 wrote 39
Consumer 1 read 29
Consumer 2 read 70
Consumer 1 read 30
Producer 2 wrote 40
Consumer 1 read 32
Consumer 1 read 33
Consumer 1 read 34
Consumer 1 read 35
Consumer 1 read 36
Consumer 2 read 31
Consumer 1 read 37
Consumer 2 read 38
Consumer 2 read 40
Consumer 1 read 39
Consumer 1 is waiting ....
Consumer 2 is waiting ....
Consumer 2 is waiting ....