Java Queue with Consumer and Producer (ReentrantLock) Issue

43 Views Asked by At

There is a queue with priorities and four threads – two produce and enter priority values into the queue (the value and priority are the same) and two extract priority values from it.

And I need to separate Consumer Threads with this logic:

The first thread of the threads extracting priority values from the queue can only extract values from 1 to 5, the second - values from 6 to 10.

Program should use ReentrantLock and Condition for a synchronization.

I have this program code:

Queue class:

import java.util.concurrent.locks.*;

public class Queue implements PriorityQueue {
    private int[] heap; // массив кучи
    private int space; // количество элементов в очереди
    private ReentrantLock lock; // блокировка
    private Condition notFull; // объект управления "не полный"
    private Condition notEmpty; // объект управления "не пустой"

public Queue(){
        heap = new int[10];
        space = 0;
        lock = new ReentrantLock();
        notFull = lock.newCondition();
        notEmpty = lock.newCondition();
    }

public void insert(int val){
        lock.lock();
        try {
            while (full()) {
                notFull.await();
            }
            int cur = space;
            heap[space++] = val;

            while (cur > 0 && heap[cur] > heap[(cur - 1) / 2]) {
                change(cur, (cur - 1) / 2);
                cur = ((cur - 1) / 2);
            }

            notEmpty.signalAll();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

public int deleteMax(){
        lock.lock();
        try {
            while (empty()) {
                notEmpty.await();
            }
            int res = heap[0];
            space--;
            heap[0] = heap[space];


            int current = 0;
            while (current * 2 + 1 < space) {
                int max;
                int left = current * 2 + 1;
                int right = current * 2 + 2;
                if (right < space && heap[right] > heap[left])
                    max = right;
                else max = left;
                if (heap[current] < heap[max]) {
                    change(current, max);
                    current = max;
                } else break;
            }

            notFull.signalAll();
            return res;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return -1;
        } finally {
            lock.unlock();
        }
    }

public boolean full(){
        return space == 10;
    }

public boolean empty(){
        return space == 0;
    }

private void change(int one, int two){
        int hold = heap[one];
        heap[one] = heap[two];
        heap[two] = hold;
    }

    public int getMax(){
        return heap[0];
    }

    public void printQueue(){
        lock.lock();
        int l = heap.length;
        System.out.println("Queue: ");
        for (int i = 0; i < l; i++){
            System.out.printf("%3s", heap[i]);
        }
        System.out.println();
        lock.unlock();
    }
}

Producer class:

import java.util.Random;

public class Producer implements Runnable {
    private Queue queue;
    private Random random = new Random();

public Producer(Queue queue) {
        this.queue = queue;
    }

public void run() {
        while (true) {
            try {
                int k = random.nextInt(1, 3);
                for (int i = 0; i < k; i++) {
                    int val = random.nextInt(1, 10);
                    System.out.println(Thread.currentThread().getName() + " Producer add " + val);
                    queue.insert(val);
                    queue.printQueue();
                }
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                System.out.println("-------ERRORRRRRR");
                Thread.currentThread().interrupt();
            }
        }
    }

}

Consumer class:

import java.util.Random;

public class Consumer implements Runnable {
    private Queue queue;
    private int bound1; // либо 1, либо 6
    private int bound2; // либо 5, либо 10

public Consumer(Queue queue, int bound1, int bound2) {
        this.queue = queue;
        this.bound1 = bound1;
        this.bound2 = bound2;
    }

public void run(){
        while(true){
            try {
                int val = queue.getMax();
                if (bound1 <= val && val <= bound2){
                    System.out.println(Thread.currentThread().getName() + " Consumer take " + queue.deleteMax());
                    queue.printQueue();
                }else {
                    //System.out.println( Thread.currentThread().getName() + " ------------val isnt correct---------");
                }
                Thread.sleep(3000);
            }catch (InterruptedException e){
                System.out.println("-------ERRORRRRRR");
                Thread.currentThread().interrupt();
            }
        }
    }
}

Main class:


public class Main {
    public static void main(String[] args){
        Queue queue = new Queue();
        Thread p1 = new Thread(new Producer(queue));
        Thread p2 = new Thread(new Producer(queue));
        Thread c1 = new Thread(new Consumer(queue, 1, 5));
        Thread c2 = new Thread(new Consumer(queue, 6, 10));

        p1.start();
        p2.start();
        c1.start();
        c2.start();


    }
}

As you can see I separates Consumer Threads in Consumer class using bound1 and bound2.

But now I have to change this implementation and use the third (or more) Condition object and make separation inside Queue class.

I tried, but honestly I don't clearly understand how to implement separation using Condition. There's my attempt, but the Consumer threads are extracting all values without separation.

public class Queue implements PriorityQueue {
    private int[] heap; // массив кучи
    private int space; // количество элементов в очереди
    private ReentrantLock lock; // блокировка
    private Condition notFull; // объект управления "не полный"
    private Condition notEmpty_1_5; // объект управления "не пустой"
    private Condition notEmpty_6_10;

public Queue(){
        heap = new int[10];
        space = 0;
        lock = new ReentrantLock();
        notFull = lock.newCondition();
        notEmpty_1_5 = lock.newCondition();
        notEmpty_6_10 = lock.newCondition();
    }

public void insert(int val){
        lock.lock();
        try {
            while (full()) {
                notFull.await();
            }
            int cur = space;
            heap[space++] = val;

            while (cur > 0 && heap[cur] > heap[(cur - 1) / 2]) {
                change(cur, (cur - 1) / 2);
                cur = ((cur - 1) / 2);
            }

            System.out.println(Thread.currentThread().getName() + " Producer add " + val);
            printQueue();

            if (val >= 1 && val <= 5) notEmpty_1_5.signalAll();
            else notEmpty_6_10.signalAll();

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

public int deleteMax(){
        lock.lock();
        try {
            int res = heap[0];
            while (empty()) {
                if (res >= 1 && res <= 5) notEmpty_1_5.await();
                else notEmpty_6_10.await();
            }

            space--;
            heap[0] = heap[space];

            int current = 0;
            while (current * 2 + 1 < space) {
                int max;
                int left = current * 2 + 1;
                int right = current * 2 + 2;
                if (right < space && heap[right] > heap[left])
                    max = right;
                else max = left;
                if (heap[current] < heap[max]) {
                    change(current, max);
                    current = max;
                } else break;
            }

            System.out.println(Thread.currentThread().getName() + " Consumer take " + res);
            printQueue();

            notFull.signalAll();
            return res;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return -1;
        } finally {
            lock.unlock();
        }
    }

Can you help me and explain the correct way to make this separation?

0

There are 0 best solutions below