Let's say I have a list of 10000 elements and want to process them on 6 cores. I don't want to use the existing Stream API and want to do it by myself from scratch (for the learning sake). The Spliterator interface seem to fit well for that purpose. However, it divides the collection in half whenever called. I can get 5000-5000 split, and then do one more split to get 2500-2500-2500-2500, and then 2500-2500-2500-1250-1250 to cut my initial collection into 6 parts. It seems unbalanced and no way to balance it over 6 cores.
From the Java.Doc
API Note: An ideal trySplit method efficiently (without traversal) divides its elements exactly in half, allowing balanced parallel computation.
However, the Stream .parallel() seems to solve this problem somehow. I tried to read the source, but still unable to get the gist of it. Maybe someone can explain the high level approach to me.
If you want to reimplement the functionality provided by parallel streams, then apart from the dividing the task into subtasks, you need to take care about the execution of these stacks and joining the result of the results.
Under the hood, parallel streams make use of the Fork/Join framework.
Spliteratoris needed only split the data into subtasks. But the order of in which worker threads would be assigned with the task and correctness of merging the intermediate result is implemented through Fork/Join.If you want to do it yourself, you can extend abstract class
RecursiveTaskand override its methodcompute(). That would be a "container" for your tasks (there's alsoRecursiveActionclass which is meant to perform an action and doesn't produce a value, but the question is about computations, and we need and to obtain a resultRecursiveTaskis more suitable for this purpose).To make it more flexible, you can add a property of type
FunctionorPredicatewhich would be provided while instantiating it, but it would be no match for the power and flexibility of parallel streams.While implementing
compute()you need to provide the logic for splitting the task. You can useSpliteratorfor that and if source allow accessing random elements (like list or array) it can be done manually.If you choose to utilize
Spliteratorfor dividing the dataset, you can use methodtrySplit(), which returnsSpliteratorwhich would benulldata can't be split further. Hence, iftrySplit()yieldsnullyou need to process remaining elements of the current spliterator. Otherwise, you need to create a new task based on the new spliterator returned bytrySplit()and applyfork()on it, and then merge the result produced by processing the remaining elements in the current spliterator with the value returned the byjoin()method applied on the new task.But note with
Spliteratorthat you'll face an issue while it comes to processing the data. Contrary toIteratorthis interface doesn't declare methods that allow to access elements directly, it's not what it's meant for.Spliteratoroffers only a couple of methods which allow to dial with its elements:forEachRemaining()andtryAdvance(). The first one isvoid, the second returnsbooleanvalue, both expect aConsumeras an argument. That means that you'll be forced to use stateful functions (which is not a good practice) in order to return a value from thecompute().We can specify the required level of parallelism (maximum number of threads that would be occupied simultaneously) by using one of the parameterized constructrs of
ForkJoinPool. Or alternatively, we can make use of the Java 8 factory methodnewWorkStealingPool()from theExecutorsclass.Parallel processing using Spliterator
RecursiveTaskimplementation:main()- Let's generate a total of all numbers in the Given Collection, and add up separately all odd and all even elements.Output:
Spliterator + Iterator
We can improve the approach shown above by introducing
Iteratoras an additional property.That would allow making
Spliteratorto be responsible only for splitting the tasks, meanwhileIteratorwould be used for processing the data. And it would allow to avoid using stateful functions like in the previous example.RecursiveTaskimplementation:main()- the same sample data.Output: