There are two dataframes df
and df1
Then, let's consider 3 cases:
df1
only has the same number of rows asdf
df1
has the same number of rows asdf
and, the same number of partitions asdf
. Thinkdf.repartition(k)
and,df1.repartition(k)
were executed right beforedf1
has the same number of rows asdf
, the same number of partitions asdf
and the same distribution of rows asdf
. Thinkdf1
is brought about bydf1=df.select('col1','col2',..)
Now,
I do:
df.repartition(n)
then, lets say it distributes df
with :
- Partition 1 - {some rows 1}
- Partition 2 - {some rows 2}
- ...
- partition n - {some rows n}
Then, I do df1.repartition(n)
.
For all the three configurations of df1
as mentioned above.
- Can it be said that each partition of
df1
would have the same number of rows as the partitions ofdf
postdf.repartition(n)
have? Meaning, if executor 1 got 3 partitions fordf
with a number of rows r1,r2,r3 then it also got 3 partitions fordf1
with number of rows r1,r2,r3 - Can it be said that each partition of
df1
got distributed identically asdf
's partitions were distributed postdf.repartition(n)
? Meaning, the row i from both the dataframes went to the same partition
Let's try to understand this by looking at the source code.
When you call
df.repartition(someInteger)
in pyspark, this line gets executed:This brings us to the Java
repartition
function, which we van find in Dataset.scala:So that adds a Repartition operation to our query plan:
In there, we see that in case
numPartitions > 1
the partitioning used is aRoundRobinPartitioning(numPartitions)
. Let's have a look at thisRoundRobinPartitioning
in action in ShuffleExchangeExec'sprepareShuffleDependency
method. There are 2 interestingval
s in there:.hashcode()
and the modulo operator to determine the partitioning:rddWithPartitionIds
val adds the following documentation:Conclusion
df.repartition(someInteger)
, we're using Java's.hashCode
and a modulo operation to determine the partition in which a record will end up in. This applies the.hashCode
method on your Java object. This is apparently not necessarily deterministic from Java application to Java application. I did try to find out situations where I would not get always the same results (on a very small scale) and did not find any case where the partitioning was not identical. For this, I used this testing code:Hope this helps!