I have a recursive task which splits asymmetrically. Instead of splitting in halves it rather bites a pice of work it can execute and forks the rest of work. Such scenario does not get paralleled as expected.

Consider the following code to understand what I mean:

public class Sandbox {

    public static void main(String[] args) {
        var pool = new ForkJoinPool(5);
        var task = new Task(20, 1);
        var start = Instant.now();
        pool.invoke(task);
        System.out.println("Elapsed Time: " + Duration.between(start, Instant.now()).toSeconds());
    }

    static class Task extends RecursiveAction {

        int work; 
        int taskNum;

        public Task(int work, int taskNum) {
            this.work = work;
            this.taskNum = taskNum;
        }

        protected void compute() {
            System.out.println(Thread.currentThread().getName() + " compute enter: " + this);
            if (work > 1) {
                // bite ONE from work counter
                var w = new Task(1, taskNum);
                // split the remaining work and fork
                var s = new Task(work - 1, taskNum + 1);
                s.fork();
                w.compute();
                s.join();
            } else {
                doWork();
            }
            System.out.println(Thread.currentThread().getName() + " compute exit: " + this);
        }

        void doWork() {
            try {
                System.out.println(Thread.currentThread().getName() + " working: " + this);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public String toString() {
            return "Task{" + "work=" + work + ", taskNum=" + taskNum + '}';
        }
    }
}

There is a work counter which defines the amount of work to do. Each task literally bites ONE from that counter and forks the remaining work, then joins on remaining work until it is completed.

I expect such task will be paralleled with parallelism defined in ForkJoinPool constructor (parallelism = 5). But it starts in parallel and then the only one thread seems to be working, the rest of the threads just waiting on join() call.

Please help me understand why the task do not get paralleled till the end. As I do call fork() before performing computation there is always one task in the working queue that other threads should steal and split further.

1

There are 1 best solutions below

0
markspace On

I tried your example with invokeAll and the result seems more balanced (also about 50% faster). I'm going to guess that what is happening is the original thread that is called, races ahead and basically queues up all the Tasks before any other thread in the pool gets a chance, so it ends up with all the w tasks on its call stack. So it just ends up doing 50% of the work itself.

class Sandbox {

   public static void main( String[] args ) {
      var pool = new ForkJoinPool( 5 );
      var task = new Task( 20, 1 );
      long start = System.nanoTime();
      pool.invoke( task );
      System.out.println( "time: " + (System.nanoTime() - start) * 1e-9 * 1e3 + "ms" );
   }

   static class Task extends RecursiveAction {

      int work;
      int taskNum;

      public Task( int work, int taskNum ) {
         this.work = work;
         this.taskNum = taskNum;
      }

      protected void compute() {
         System.out.println( Thread.currentThread().getName() + " compute enter: " + this );
         if( work > 1 ) {
            // bite ONE from work counter
            var w = new Task( 1, taskNum );
            // split the remaining work and fork
            var s = new Task( work - 1, taskNum + 1 );
            invokeAll( w, s );
//            s.fork();
//            w.compute();
//            s.join();
         } else {
            doWork();
         }
         System.out.println( Thread.currentThread().getName() + " compute exit: " + this );
      }

      void doWork() {
         try {
            System.out.println( Thread.currentThread().getName() + " working: " + this );
            Thread.sleep( 1000 );
         } catch( InterruptedException e ) {
            Thread.currentThread().interrupt();
         }
      }

      public String toString() {
         return "Task{" + "work=" + work + ", taskNum=" + taskNum + '}';
      }
   }
}