Imitation of Thread Pool doesn't work correctly

187 Views Asked by At

I've been trying to make a simple implementation of Thread-Pool using Active Objects.

Here is my Main:

public static void main(String[] args){
   MyThreadPool tp = new MyThreadPool(100,3);
        tp.execute(()->{
            try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
            System.out.println("42");
        });
        tp.shutDown();
}

The shutDown method is usually called first through the Main and therefore keeps the Active Objects "alive" unwantedly, but sometimes I get the wanted outcome. Any idea why there is uncertainty about the result?

Below you can see the rest of the classes:

public class MyThreadPool {

    ArrayBlockingQueue<Runnable> q;
    ArrayBlockingQueue<ActiveObject> activeObjects;
    volatile boolean stop;
    AtomicInteger count;
    Thread t;
    Runnable stopTask;

    public MyThreadPool(int capacity, int maxThreads) {
        activeObjects = new ArrayBlockingQueue<>(maxThreads);
        q = new ArrayBlockingQueue<>(capacity);
        count = new AtomicInteger(0);
        stopTask = ()->stop = true;

        t=new Thread(()->{
            //System.out.println("Thread-Pool Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    Runnable task = q.take();
                    if(task==stopTask)
                        stopTask.run();
                    else
                        //size() is atomic integer
                        if (count.get() < maxThreads) {
                            ActiveObject a = new ActiveObject(capacity);
                            activeObjects.put(a);
                            count.incrementAndGet();
                            a.execute(task);
                        }
                        //we will assign the next task to the least busy ActiveObject
                        else {
                            int minSize = Integer.MAX_VALUE;
                            ActiveObject choice = null;
                            for (ActiveObject a : activeObjects) {
                                if (a.size() < minSize) {
                                    minSize = a.size();
                                    choice = a;
                                }
                            }
                            choice.execute(task);
                        }

                } catch (InterruptedException e) { }
            }
            //System.out.println("Thread-Pool Ended");
        });
       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        activeObjects.forEach(a->a.shutDownNow());
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        activeObjects.forEach(a->a.shutDown());
        execute(stopTask);
    }
public class ActiveObject {

    ArrayBlockingQueue<Runnable> q;
    volatile boolean stop;
    Thread t;

    public ActiveObject(int capacity) {
        q = new ArrayBlockingQueue<>(capacity);
        t=new Thread(()->{
            //System.out.println("Active Object Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    q.take().run();
                } catch (InterruptedException e) { }
            }
            //System.out.println("Active Object Ended");
        });

       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        execute(()->stop=true);
    }

    public int size(){
        return q.size();
    }
}
1

There are 1 best solutions below

0
On BEST ANSWER

In your main method you create a thread pool (which also creates and starts tp.t thread), enqueue a task into tp.q, and then call tp.shutDown():

MyThreadPool tp = new MyThreadPool(100, 3);
tp.execute(() -> {...});
tp.shutDown();

Imagine that tp.shutDown() in the main thread is executed before the MyThreadPool.t thread processes the enqueued task:

activeObjects.forEach(a -> a.shutDown());
execute(stopTask);

here activeObjects is empty, you enqueue stopTask into tp.q, and main thread finishes.

Now we only have MyThreadPool.t thread, let's see what it does:

while (!stop) {
  try {
    Runnable task = q.take();
    if (task == stopTask)
      stopTask.run();
    else
    if (count.get() < maxThreads) {
      ActiveObject a = new ActiveObject(capacity);
      activeObjects.put(a);
      count.incrementAndGet();
      a.execute(task);
    }
    else {
      ...
    }
  } catch (InterruptedException e) {
  }
}

At this point q contains 2 tasks: a normal task and stopTask.

In the first loop iteration the normal task is taken from q, and is given for processing to a newly created ActiveObject:

ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);

new ActiveObject() also creates and starts its own internal ActiveObject.t thread.

The second loop iteration processes stopTask:

if (task == stopTask)
  stopTask.run();

which sets stop = true.
As a result, the next check while (!stop) returns false and MyThreadPool.t thread finishes.

Now we only have ActiveObject.t thread, which hasn't been stopped:

while (!stop) {
  try {
    q.take().run();
  } catch (InterruptedException e) {
  }
} 

here the thread will keep waiting on q.take() forever.