There are two dataframes df and df1
Then, let's consider 3 cases:
df1only has the same number of rows asdfdf1has the same number of rows asdfand, the same number of partitions asdf. Thinkdf.repartition(k)and,df1.repartition(k)were executed right beforedf1has the same number of rows asdf, the same number of partitions asdfand the same distribution of rows asdf. Thinkdf1is brought about bydf1=df.select('col1','col2',..)
Now,
I do:
df.repartition(n) then, lets say it distributes df with :
- Partition 1 - {some rows 1}
- Partition 2 - {some rows 2}
- ...
- partition n - {some rows n}
Then, I do df1.repartition(n).
For all the three configurations of df1 as mentioned above.
- Can it be said that each partition of
df1would have the same number of rows as the partitions ofdfpostdf.repartition(n)have? Meaning, if executor 1 got 3 partitions fordfwith a number of rows r1,r2,r3 then it also got 3 partitions fordf1with number of rows r1,r2,r3 - Can it be said that each partition of
df1got distributed identically asdf's partitions were distributed postdf.repartition(n)? Meaning, the row i from both the dataframes went to the same partition
Let's try to understand this by looking at the source code.
When you call
df.repartition(someInteger)in pyspark, this line gets executed:This brings us to the Java
repartitionfunction, which we van find in Dataset.scala:So that adds a Repartition operation to our query plan:
In there, we see that in case
numPartitions > 1the partitioning used is aRoundRobinPartitioning(numPartitions). Let's have a look at thisRoundRobinPartitioningin action in ShuffleExchangeExec'sprepareShuffleDependencymethod. There are 2 interestingvals in there:.hashcode()and the modulo operator to determine the partitioning:rddWithPartitionIdsval adds the following documentation:Conclusion
df.repartition(someInteger), we're using Java's.hashCodeand a modulo operation to determine the partition in which a record will end up in. This applies the.hashCodemethod on your Java object. This is apparently not necessarily deterministic from Java application to Java application. I did try to find out situations where I would not get always the same results (on a very small scale) and did not find any case where the partitioning was not identical. For this, I used this testing code:Hope this helps!