How does pyspark repartition work without column name specified?

378 Views Asked by At

There are two dataframes df and df1

Then, let's consider 3 cases:

  1. df1 only has the same number of rows as df
  2. df1 has the same number of rows as df and, the same number of partitions as df. Think df.repartition(k) and, df1.repartition(k) were executed right before
  3. df1 has the same number of rows as df, the same number of partitions as df and the same distribution of rows as df. Think df1 is brought about by df1=df.select('col1','col2',..)

Now,

I do: df.repartition(n) then, lets say it distributes df with :

  1. Partition 1 - {some rows 1}
  2. Partition 2 - {some rows 2}
  3. ...
  4. partition n - {some rows n}

Then, I do df1.repartition(n). For all the three configurations of df1 as mentioned above.

  1. Can it be said that each partition of df1 would have the same number of rows as the partitions of df post df.repartition(n) have? Meaning, if executor 1 got 3 partitions for df with a number of rows r1,r2,r3 then it also got 3 partitions for df1 with number of rows r1,r2,r3
  2. Can it be said that each partition of df1 got distributed identically as df's partitions were distributed post df.repartition(n)? Meaning, the row i from both the dataframes went to the same partition
1

There are 1 best solutions below

0
On

Let's try to understand this by looking at the source code.

When you call df.repartition(someInteger) in pyspark, this line gets executed:

return DataFrame(self._jdf.repartition(numPartitions), self.sparkSession)

This brings us to the Java repartition function, which we van find in Dataset.scala:

  def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = true, logicalPlan)
  }

So that adds a Repartition operation to our query plan:

case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
  extends RepartitionOperation {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")

  override def partitioning: Partitioning = {
    require(shuffle, "Partitioning can only be used in shuffle.")
    numPartitions match {
      case 1 => SinglePartition
      case _ => RoundRobinPartitioning(numPartitions)
    }
  }
  override protected def withNewChildInternal(newChild: LogicalPlan): Repartition =
    copy(child = newChild)
}

In there, we see that in case numPartitions > 1 the partitioning used is a RoundRobinPartitioning(numPartitions). Let's have a look at this RoundRobinPartitioning in action in ShuffleExchangeExec's prepareShuffleDependency method. There are 2 interesting vals in there:

  • The partitioner creates a HashPartitioner which uses Java's .hashcode() and the modulo operator to determine the partitioning:
    val part: Partitioner = newPartitioning match {
      case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
      case HashPartitioning(_, n) =>
         ...
    }

  • The rddWithPartitionIds val adds the following documentation:
      // [SPARK-23207] Have to make sure the generated RoundRobinPartitioning is deterministic,
      // otherwise a retry task may output different rows and thus lead to data loss.
      //
      // Currently we following the most straight-forward way that perform a local sort before
      // partitioning.
      //
      // Note that we don't perform local sort if the new partitioning has only 1 partition, under
      // that case all output rows go to the same partition.

Conclusion

  • When we do df.repartition(someInteger), we're using Java's .hashCode and a modulo operation to determine the partition in which a record will end up in. This applies the .hashCode method on your Java object. This is apparently not necessarily deterministic from Java application to Java application. I did try to find out situations where I would not get always the same results (on a very small scale) and did not find any case where the partitioning was not identical. For this, I used this testing code:
import spark.implicits._

val df = Seq(
  ("Alex", 4.0, 3.2, 3.0),
  ("Cathy", 2.0, 4.2, 1.2),
  ("Alice", 1.0, 5.0, 3.5),
  ("Mark", 3.0, 3.5, 0.5),
  ("Jenny", 3.0, 3.5, 0.5),
  ("Bob", 3.0, 3.5, 0.5),
  ("John", 3.0, 3.5, 0.5),
  ("Chloe", 3.0, 3.5, 0.5)
).toDF("Name", "Test A", "Test B", "Test C")
 .repartition(3)

val output = df
  .rdd
  .mapPartitionsWithIndex{
    (index, itr) => itr.toList.map(x => x + "#" + index).iterator
  }.collect()
  • To make sure this repartitioning operation is deterministic during an operation (so robust against tasks failing), some sorting has been added (which makes your repartitioning a tiny bit slower).

Hope this helps!