I want to read multiple files, count repeating lines, sort lines by number of repetitions, take top 10 repeating lines.
lines = env.readTextFile("logs-dir")
tuples = lines.map(line -> Tuple2(line, 1))
aggregate = tuples.groupBy(0).sum(1)
sort = aggregate.sortPartition(1, Order.DESCENDING)
sorted.first(10).writeAsText("domains")
The problem is that first-n is arbitrary and returns random 10 first elements from all the partitions.
Is there a way to select sorted first-n elements from all the partitions without reducing the parallelism to 1?
I would solve this issue with a parallel
MapPartitionFunction
that returns the first 10 elements of each partition, sending the result to a single partition, sorting it and taking the first 10 again. This would look like this:The
MapPartitionFunction
First
would be very simple. It just counts down how many records to forward and returns from themapPartition()
function once the counter is down to0
.