virtual thread, executors and executor hook methods

305 Views Asked by At

I am trying to figure out how to work with virtual threads yet still use ThreadPoolExecutor#beforeExecute.

My older code simply extends ThreadPoolExecutor, but its constructor takes core/max pool size, a queue, which are stuff I assume you should not use or need in virtual threads.

Is there a way to enjoy the beforeExecute with virtual threads?

2

There are 2 best solutions below

0
Holger On

You can abuse a ThreadPoolExecutor to create a new virtual thread per task without an actual pool, like

public class PseudoThreadPoolExecutor extends ThreadPoolExecutor {

    public PseudoThreadPoolExecutor() {
      super(0, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS,
            new SynchronousQueue<>(), Thread.ofVirtual().factory());
    }
  
    public PseudoThreadPoolExecutor(RejectedExecutionHandler handler) {
      super(0, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS,
            new SynchronousQueue<>(), Thread.ofVirtual().factory(), handler);
    }
  
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
      System.out.println("PseudoThreadPoolExecutor.beforeExecute() "
                       + r + " on " + t);
    }
}

however, there is no need for such an approach when all you want, is to have a hook to be executed before each task:

public static ExecutorService newVirtualThreadPerTaskExecutor(
    BiConsumer<Thread, Runnable> before) {

    ThreadFactory base = Thread.ofVirtual().factory();
    ThreadFactory withBefore = r -> base.newThread(() -> {
        before.accept(Thread.currentThread(), r);
        r.run();
    });
    return Executors.newThreadPerTaskExecutor(withBefore);
}
ExecutorService es = newVirtualThreadPerTaskExecutor((t,r) ->
    System.out.println("VirtualThreadPerTaskExecutor.beforeExecute() "
                     + r + " on " + t));

0
igor.zh On

ThreadPoolExecutor is not suitable base class for virtual threads as their pooling is not recommended. You, however, could subclass from AbstractExecutorService , which has nothing to do with pooling, provides nice support for Future, Callable, and the kind, and is a base class for ThreadPoolExecutor you liked.

An implementation might look like this:

public class VirtualThreadExecutorService extends AbstractExecutorService {

    private class Worker implements Runnable {

        private final Runnable task;
        private final Thread thread;

        Worker(Runnable task) {
            this.task = task;
            this.thread = threadFactory.newThread(this);
        }

        public void run() {
            runWorker(this);
        }

    }

    private final ThreadFactory threadFactory = Thread.ofVirtual().factory();

    private void runWorker(Worker worker) {
        final Runnable task = worker.task;
        beforeExecute(worker.thread, task);
        try {
            task.run();
            afterExecute(task, null);
        } catch (Throwable ex) {
            afterExecute(task, ex);
            throw ex;
        }
    }
    
    @Override
    public void execute(Runnable command) {
        new Worker(command).thread.start();
    }

    protected void beforeExecute(Thread t, Runnable r) {
    }

    protected void afterExecute(Runnable r, Throwable t) {
    }
    
    // other methods

    @Override
    public void shutdown() {
        // TODO Auto-generated method stub

    }

    @Override
    public List<Runnable> shutdownNow() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public boolean isShutdown() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean isTerminated() {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }

}

As you might notice, the semantic of beforeExecute and afterExecute is the same as of ThreadPoolExecutor (for example, they are executed on the Executor's thread), and even some class design ideas (Worker) have been stolen right from there.

Another good question is the unimplemented ("other") methods, they are mainly about shutting down and termination. As virtual threads are supposed to be short-lived, fast and lightweight, all these methods could be left unimplemented. But if, for some reason, you want to implement them, other subclasses of AbstractExecutorService, like org.awaitility.core.SameThreadExecutorService or org.apache.tomcat.util.threads.InlineExecutorService can serve as a model.