collecting results asynchronously from gpars parallel executor

795 Views Asked by At

We've got some code in Java using ThreadPoolExecutor and CompletionService. Tasks are submitted in large batches to the pool; results go to the completion service where we collect completed tasks when available without waiting for the entire batch to complete:

 ThreadPoolExecutor _executorService =
            new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, new LinkedBlockingQueue(20));
 CompletionService _completionService =
            new ExecutorCompletionService<Callable>(_executorService)

//submit tasks
_completionService.submit( some task);

//get results
while(...){
   Future result = _completionService.poll(timeout);
   if(result)
      //process result
}

The total number of workers in the pool is MAX_NUMBER_OF_WORKERS; tasks submitted without an available worker are queued; up to 20 tasks may be queued, after which, tasks are rejected.

What is the Gpars counterpart to this approach?

Reading the documentation on gpars parallelism, I found many potential options: collectManyParallel(), anyParallel(), fork/join, etc., and I'm not sure which ones to even test. I was hoping to find some mention of "completion" or "completion service" as a comparison in the docs, but found nothing. I'm looking for some direction/pointers on where to start from those experienced with gpars.

1

There are 1 best solutions below

0
On

Collecting results on-the-fly, throttling producers - this calls for a dataflow solution. Please find a sample runnable demo below:

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

int MAX_NUMBER_OF_WORKERS = 10

ThreadPoolExecutor _executorService =
        new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, MAX_NUMBER_OF_WORKERS, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(200));

final group = new DefaultPGroup(new DefaultPool(_executorService))
final results = new DataflowQueue()

//submit tasks
30.times {value ->
    group.task(new Runnable() {
        @Override
        void run() {
            println 'Starting ' + Thread.currentThread()
            sleep 5000
            println 'Finished ' + Thread.currentThread()
            results.bind(value)
        }
    });
}
group.task {
    results << -1  //stop the consumer eventually
}

//get results
while (true) {
    def result = results.val
    println result
    if (result == -1) break
    //process result
}

group.shutdown()