I have read a lot on this algorithem. When explaining this algorithem , the words : work stealing algorithm [closed]
Those forked subtasks can recursively create more subtasks themself and thus fill up the working queues of the parallely working threads. If one thread finished and has nothing more to do, he can "steal" the work from the queue of another thread.
I understand that this algorithem is new and does not exists in Executors.newCachedThreadPool / Executors.newFixedThreadPool
I would like to see that one thread handles only work from it's queue.
I created a small program that creates threads recursively. see below.
How can I see that it doesn't use the work stealing algorithem ?
public static void main(String[] args) throws Exception {
int[] myArray = IntStream.rangeClosed(1, 10).toArray();
ExecutorService executorService = Executors.newFixedThreadPool(5);
CustomCallable customCallable = new CustomCallable(myArray, executorService);
customCallable.call();
executorService.shutdownNow();
}
public class CustomCallable implements Callable<Integer> {
private static final int THRESHOLD = 2;
private int[] array;
private ExecutorService executorService;
public CustomCallable(int[] array, ExecutorService executorService) {
this.array = array;
this.executorService = executorService;
}
@Override
public Integer call() throws Exception {
int sum = 0;
log.debug(" start [{}] ", Arrays.toString(array) );
if (array.length > THRESHOLD) {
List<Callable<Integer>> dividedTasks = createSubtasks(array, executorService);
sum = executorService.invokeAll(dividedTasks).stream()
.mapToInt(feature -> {
try {
return feature.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return 0;
})
.sum();
} else {
sum = processing(array);
}
log.debug(" sum[{}]={} ", Arrays.toString(array) ,sum);
return sum;
}
private List<Callable<Integer>> createSubtasks(int[] array, ExecutorService executorService) {
int[] arr1 = Arrays.copyOfRange(array, 0, array.length / 2);
int[] arr2 = Arrays.copyOfRange(array, array.length / 2, array.length);
List<Callable<Integer>> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomCallable(arr1, executorService));
dividedTasks.add(new CustomCallable(arr2, executorService));
return dividedTasks;
}
private Integer processing(int[] array) {
int result = Arrays.stream(array)
.sum();
return result;
}
}
This is the output :
[main] DEBUG com.example.CustomCallable - start [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
[pool-2-thread-1] DEBUG com.example.CustomCallable - start [[1, 2, 3, 4, 5]]
**[pool-2-thread-3] DEBUG com.example.CustomCallable - start [[1, 2]]**
[pool-2-thread-2] DEBUG com.example.CustomCallable - start [[6, 7, 8, 9, 10]]
[pool-2-thread-4] DEBUG com.example.CustomCallable - start [[3, 4, 5]]
[pool-2-thread-5] DEBUG com.example.CustomCallable - start [[6, 7]]
**[pool-2-thread-3] DEBUG com.example.CustomCallable - sum[[1, 2]]=3**
[pool-2-thread-5] DEBUG com.example.CustomCallable - sum[[6, 7]]=13
**[pool-2-thread-3] DEBUG com.example.CustomCallable - start [[8, 9, 10]]**
[pool-2-thread-5] DEBUG com.example.CustomCallable - start [[3]]
[pool-2-thread-5] DEBUG com.example.CustomCallable - sum[[3]]=3
[pool-2-thread-5] DEBUG com.example.CustomCallable - start [[4, 5]]
[pool-2-thread-5] DEBUG com.example.CustomCallable - sum[[4, 5]]=9
[pool-2-thread-5] DEBUG com.example.CustomCallable - start [[8]]
[pool-2-thread-5] DEBUG com.example.CustomCallable - sum[[8]]=8
[pool-2-thread-5] DEBUG com.example.CustomCallable - start [[9, 10]]
[pool-2-thread-5] DEBUG com.example.CustomCallable - sum[[9, 10]]=19
**[pool-2-thread-3] DEBUG com.example.CustomCallable - sum[[8, 9, 10]]=27**
[pool-2-thread-4] DEBUG com.example.CustomCallable - sum[[3, 4, 5]]=12
[pool-2-thread-2] DEBUG com.example.CustomCallable - sum[[6, 7, 8, 9, 10]]=40
[pool-2-thread-1] DEBUG com.example.CustomCallable - sum[[1, 2, 3, 4, 5]]=15
[main] DEBUG com.example.CustomCallable - sum[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]=55
As you can see :
main triggered thread-2 and thread-1
thread-1 triggered thread-3(calculating [1,2]) and thread-4
thread-2 triggered thread-5 and thread-3(calculating [1,2] after finishing calculating[8,9,10])
As I understand thread-3 handles work from thread-1 and thread-2.
Looks like thread-3 work steal
If I change it to
ExecutorService executorService = Executors.newWorkStealingPool();
which supports workstealing algorithem, what do I see differently ?
[main] DEBUG com.example.CustomCallable - start [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
[ForkJoinPool-1-worker-1] DEBUG com.example.CustomCallable - start [[1, 2, 3, 4, 5]]
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable - start [[1, 2]]
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable - sum[[1, 2]]=3
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable - start [[3, 4, 5]]
[ForkJoinPool-1-worker-3] DEBUG com.example.CustomCallable - start [[6, 7, 8, 9, 10]]
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable - start [[6, 7]]
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable - sum[[6, 7]]=13
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable - start [[8, 9, 10]]
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - start [[3]]**
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - sum[[3]]=3**
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - start [[4, 5]]
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - sum[[4, 5]]=9
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - start [[8]]**
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - sum[[8]]=8**
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - start [[9, 10]]
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable - sum[[9, 10]]=19
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable - sum[[3, 4, 5]]=12
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable - sum[[8, 9, 10]]=27
[ForkJoinPool-1-worker-1] DEBUG com.example.CustomCallable - sum[[1, 2, 3, 4, 5]]=15
[ForkJoinPool-1-worker-3] DEBUG com.example.CustomCallable - sum[[6, 7, 8, 9, 10]]=40
[main] DEBUG com.example.CustomCallable - sum[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]=55
As you can see we have 5 workers and worker-4 handles work triggered by worker-3 as well as worker-1. In what way this is different from the previous execution ?
You can download the code from github