Use of partitioners in Spark

582 Views Asked by At

Hy, I have a question about partitioning in Spark,in Learning Spark book, authors said that partitioning can be useful, like for example during PageRank at page 66 and they write :

since links is a static dataset, we partition it at the start with partitionBy(), so that it does not need to be shuffled across the network

Now I'm focused about this example, but my questions are general:

  1. why a partitioned RDD doesn't need to be shuffled?
  2. PartitionBy() is a wide transformation,so it will produce shuffle anyway,right?
  3. Could someone illustrate a concrete example and what happen into each single node when partitionBy happens?

Thanks in advance

1

There are 1 best solutions below

8
On BEST ANSWER

Why a partitioned RDD doesn't need to be shuffled?

When the author does:

val links = sc.objectFile[(String, Seq[String])]("links")
 .partitionBy(new HashPartitioner(100))
 .persist()

He's partitioning the data set into 100 partitions where each key will be hashed to a given partition (pageId in the given example). This means that the same key will be stored in a single given partition. Then, when he does the join:

val contributions = links.join(ranks)

All chunks of data with the same pageId should already be located on the same executor, avoiding the need for a shuffle between different nodes in the cluster.

PartitionBy() is a wide transformation,so it will produce shuffle anyway, right?

Yes, partitionBy produces a ShuffleRDD[K, V, V]:

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  if (self.partitioner == Some(partitioner)) {
    self
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}

Could someone illustrate a concrete example and what happen into each single node when partitionBy happens?

Basically, partitionBy will do the following:

Hash Partitioning

It will hash the key modulu the number of partitions (100 in this case), and since it relys on the fact that the same key will always produce the same hashcode, it will package all data from a given id (in our case, pageId) to the same partition, such that when you join, all data will be available in that partition already, avoiding the need for a shuffle.