Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct?

3.9k Views Asked by At

I have an RDD that is too large to consistently perform a distinct statement without spurious errors (e.g. SparkException stage failed 4 times, ExecutorLostFailure, HDFS Filesystem closed, Max number of executor failures reached, Stage cancelled because SparkContext was shut down, etc.)

I am trying to count distinct IDs in a particular column, for example:

print(myRDD.map(a => a._2._1._2).distinct.count())

is there an easy, consistent, less-shuffle-intensive way to do the command above, possibly using mapPartitions, reduceByKey, flatMap, or other commands that use fewer shuffles than distinct?

See also What are the Spark transformations that causes a Shuffle?

2

There are 2 best solutions below

5
On

It might be better to figure out if there is another underlying issue, but the below will do what you want...rather round about way to do it, but it sounds like it will fit your bill:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2) 
  .keys
  .count

Or even this seems to work, but it isn't associative and commutative. It works due to how the internals of Spark works...but I might be missing a case...so while simpler, I'm not sure I trust it:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
  .keys.count
0
On

As I see it there are 2 possible solutions for this matter:

  1. With a reduceByKey
  2. With a mapPartitions

Let's see both of them with an example.

I have a dataset of 100.000 movie ratings with the format (idUser, (idMovie, rating)). Let's say we would like to know how many different users have rated a movie:

Lets first take a look using distinct:

val numUsers = rddSplitted.keys.distinct()
println(s"numUsers is ${numUsers.count()}")
println("*******toDebugString of rddSplitted.keys.distinct*******")
println(numUsers.toDebugString)

We will get the following results:

numUsers is 943

*******toDebugString of rddSplitted.keys.distinct*******
(2) MapPartitionsRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 |  ShuffledRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 +-(2) MapPartitionsRDD[4] at distinct at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[3] at keys at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

With the toDebugString function, we can analyze in a better way what is happening with our RDD's.

Now, let's use reduceByKey, for instance, counting how many times each user has voted and at the same time obtaining the number of different users:

val numUsers2 = rddSplitted.map(x => (x._1, 1)).reduceByKey({case (a, b) => a })
println(s"numUsers is ${numUsers2.count()}")
println("*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******")
println(numUsers2.toDebugString)

We will get now these results:

numUsers is 943

*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******
(2) ShuffledRDD[4] at reduceByKey at MovieSimilaritiesRicImproved.scala:104 []
 +-(2) MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:104 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

Analyzing the RDD's produced, we can see that reduceByKey performs the same action in a more efficient way than the distinct before.

Finally, let's use mapPartitions. The main goal is to try to distinct first the users in each partition of our dataset, and then obtain the final different users.

val a1 = rddSplitted.map(x => (x._1))
println(s"Number of elements in a1: ${a1.count}")
val a2 = a1.mapPartitions(x => x.toList.distinct.toIterator)
println(s"Number of elements in a2: ${a2.count}")
val a3 = a2.distinct()
println("There are "+ a3.count()+" different users")
println("*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******")
println(a3.toDebugString)

We will get the following:

Number of elements in a1: 100000
Number of elements in a2: 1709
There are 943 different users

*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******
(2) MapPartitionsRDD[7] at distinct at MovieSimilaritiesRicImproved.scala:124 []
 |  ShuffledRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:124 []
 +-(2) MapPartitionsRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:124 []
    |  MapPartitionsRDD[4] at mapPartitions at MovieSimilaritiesRicImproved.scala:122 []
    |  MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:120 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

We can see now that mapPartition first gets the distinct number of user in each partition of the dataset, shorting the number of instances from 100,000 to 1,709 without performing any shuffle. Then, with this much lower amount of data, a distinct over the whole RDD can be carried out without worrying for the shuffle and getting the result much faster.

I would recommend using this last proposal with mapPartitions rather than the reduceByKey, as it manages a lower amount of data. Another solution could be using both functions, first mapPartitions as mentioned before and then instead of distinct, using the reduceByKey in the same way as also mentioned before.