Fetching first-n elements from multiple sorted partitions

326 Views Asked by At

I want to read multiple files, count repeating lines, sort lines by number of repetitions, take top 10 repeating lines.

lines = env.readTextFile("logs-dir")
tuples = lines.map(line -> Tuple2(line, 1))
aggregate = tuples.groupBy(0).sum(1)
sort = aggregate.sortPartition(1, Order.DESCENDING)
sorted.first(10).writeAsText("domains")

The problem is that first-n is arbitrary and returns random 10 first elements from all the partitions.

Is there a way to select sorted first-n elements from all the partitions without reducing the parallelism to 1?

1

There are 1 best solutions below

0
On BEST ANSWER

I would solve this issue with a parallel MapPartitionFunction that returns the first 10 elements of each partition, sending the result to a single partition, sorting it and taking the first 10 again. This would look like this:

lines = env.readTextFile("logs-dir")
tuples = lines.map(line -> Tuple2(line, 1))
aggregate = tuples.groupBy(0).sum(1)

// sort partitions in parallel
sortPart = aggregate.sortPartition(1, Order.DESCENDING)
// take first 10 of each partition
firstPart = sortPart.mapPartition(new First(10))

// sort all in one partition
sortFull = firstPart.sortPartition(1, Order.DESCENDING).parallelism(1)
// take first 10
first10 = sortFull.mapPartition(new First(10))
first10.writeAsText("domains")

The MapPartitionFunction First would be very simple. It just counts down how many records to forward and returns from the mapPartition() function once the counter is down to 0.