I've been trying to make a simple implementation of Thread-Pool using Active Objects.
Here is my Main:
public static void main(String[] args){
MyThreadPool tp = new MyThreadPool(100,3);
tp.execute(()->{
try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
System.out.println("42");
});
tp.shutDown();
}
The shutDown method is usually called first through the Main and therefore keeps the Active Objects "alive" unwantedly, but sometimes I get the wanted outcome. Any idea why there is uncertainty about the result?
Below you can see the rest of the classes:
public class MyThreadPool {
ArrayBlockingQueue<Runnable> q;
ArrayBlockingQueue<ActiveObject> activeObjects;
volatile boolean stop;
AtomicInteger count;
Thread t;
Runnable stopTask;
public MyThreadPool(int capacity, int maxThreads) {
activeObjects = new ArrayBlockingQueue<>(maxThreads);
q = new ArrayBlockingQueue<>(capacity);
count = new AtomicInteger(0);
stopTask = ()->stop = true;
t=new Thread(()->{
//System.out.println("Thread-Pool Started");
while(!stop){
//if queue is empty it is gonna be a blocking call
try {
Runnable task = q.take();
if(task==stopTask)
stopTask.run();
else
//size() is atomic integer
if (count.get() < maxThreads) {
ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);
}
//we will assign the next task to the least busy ActiveObject
else {
int minSize = Integer.MAX_VALUE;
ActiveObject choice = null;
for (ActiveObject a : activeObjects) {
if (a.size() < minSize) {
minSize = a.size();
choice = a;
}
}
choice.execute(task);
}
} catch (InterruptedException e) { }
}
//System.out.println("Thread-Pool Ended");
});
t.start();
}
//execute returns right away - just puts into the queue
public void execute(Runnable r ){
// if capacity is full it is gonna be a blocking call
if(!stop)
try { q.put(r); } catch (InterruptedException e) { }
}
public void shutDownNow(){
activeObjects.forEach(a->a.shutDownNow());
stop = true;
t.interrupt();
}
public void shutDown(){
activeObjects.forEach(a->a.shutDown());
execute(stopTask);
}
public class ActiveObject {
ArrayBlockingQueue<Runnable> q;
volatile boolean stop;
Thread t;
public ActiveObject(int capacity) {
q = new ArrayBlockingQueue<>(capacity);
t=new Thread(()->{
//System.out.println("Active Object Started");
while(!stop){
//if queue is empty it is gonna be a blocking call
try {
q.take().run();
} catch (InterruptedException e) { }
}
//System.out.println("Active Object Ended");
});
t.start();
}
//execute returns right away - just puts into the queue
public void execute(Runnable r ){
// if capacity is full it is gonna be a blocking call
if(!stop)
try { q.put(r); } catch (InterruptedException e) { }
}
public void shutDownNow(){
stop = true;
t.interrupt();
}
public void shutDown(){
execute(()->stop=true);
}
public int size(){
return q.size();
}
}
In your main method you create a thread pool (which also creates and starts
tp.t
thread), enqueue a task intotp.q
, and then calltp.shutDown()
:Imagine that
tp.shutDown()
in the main thread is executed before theMyThreadPool.t
thread processes the enqueued task:here
activeObjects
is empty, you enqueuestopTask
intotp.q
, andmain
thread finishes.Now we only have
MyThreadPool.t
thread, let's see what it does:At this point
q
contains 2 tasks: a normal task andstopTask
.In the first loop iteration the normal task is taken from
q
, and is given for processing to a newly createdActiveObject
:new ActiveObject()
also creates and starts its own internalActiveObject.t
thread.The second loop iteration processes
stopTask
:which sets
stop = true
.As a result, the next check
while (!stop)
returnsfalse
andMyThreadPool.t
thread finishes.Now we only have
ActiveObject.t
thread, which hasn't been stopped:here the thread will keep waiting on
q.take()
forever.