ExecutorService and AtomicInteger : RejectedExecutionException

463 Views Asked by At

I want atomicInteger to have a value of 100 then the program terminates

 public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        do {
            executor.submit(() -> {
                System.out.println(atomicInteger.getAndAdd(10));
                if (atomicInteger.get() == 100) {
                    //executor.shutdownNown();
                }
            });
        } while (true);
    }

I have error

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1d8d10a rejected from java.util.concurrent.ThreadPoolExecutor@9e54c2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 10]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1374)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)

How should I implement it.

2

There are 2 best solutions below

0
On BEST ANSWER

There is no need to use AtomicInteger here, since your Runnable lambda function invocations are guaranteed to execute sequentially (by new SingleThreadExecutor). Also, your Runnable lambda code were to take any time to execute (e.g. 2ms), your main loop will queue up far more than 10 tasks needed to hit your limit. You can see this happen if you add a 2ms sleep inside your Runnable lambda function, and also add a counter to your do/while loop, and print the value of the counter out at the end to see how many instances Runnables you queued up.

Assuming that you wish to test this code with concurrent threads, you would need to replace the call to newSingleThreadPool with newFixedThreadPool. The approach your code takes is problematic when concurrent threads are being used. In the following code, I've switched to newFixedThreadPool, added a counter, so we can see how many tasks are queued, and added to short pauses in your Runnable lambda function, just to represent a small amount of work. When I execute this program, atomicInteger became greater than 13000 and the program crashed with java.lang.OutOfMemoryError: GC overhead limit exceeded That is because, your runnable function always adds 10 to atomicInteger regardless of it's current value. And also, the code queues up more tasks than it needs. Here's the code with these small changes that illustrate the problem.

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    AtomicInteger atomicInteger = new AtomicInteger(0);
    int i=0;
    do {
        executor.submit(() -> {
            pause(2); // simulates some small amount of work.
            System.out.println("atomicInt="+atomicInteger.getAndAdd(10));
            pause(2); // simulates some small amount of work.
            if (atomicInteger.get() == 100) {
                System.out.println("executor.shutdownNow()");
                System.out.flush();
                executor.shutdownNow();
            }
        });
        if (atomicInteger.get() == 100) {
            break;
        }
    } while (true);
    System.out.println("final atomicInt="+atomicInteger.get());
    System.out.println("final tasks queued="+i);
}
public static void pause(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException ex) {
    }
}

Here is a version that fixes the concurrency problems and moves the executor management out of the worker threads where it doesn't really belong:

private static int LIMIT = 100;
private static int INCREMENT = 10;

public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    AtomicInteger atomicInteger = new AtomicInteger(0);
    for (int i=0; i < LIMIT/INCREMENT; i++) {
            executor.submit(() -> {
                pause(2);
                System.out.println("atomicInt=" + atomicInteger.getAndAdd(INCREMENT));
                System.out.flush();
                pause(2);
            });
    }
    executor.shutdown();
    while (!executor.isTerminated()) {
        System.out.println("Executor not yet terminated");
        System.out.flush();
        pause(4);
    }
    System.out.println("final atomicInt=" + atomicInteger.get());
}

public static void pause(long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException ex) {

    }
}
0
On

You should just change your while loop to check for the condition that you needed and shutdown the executor after that