I have two dataframe - target_df and reference_df. I need to remove account_id's in target_df which is present in reference_df. target_df is created from hive table, will have hundreds of partitions. It is partitioned based on date(20220101 to 20221101).

I am doing left anti-join and writing data in hdfs location.

 val numPartitions = 10
 val df_purge = spark.sql(s"SELECT /*+  BROADCASTJOIN(ref) */ target.* FROM  input_table target LEFT ANTI JOIN ${reference_table} ref ON target.${Customer_ID} = ref.${Customer_ID}")
 df_purge.coalesce(numPartitions).write.partitionBy("date").mode("overwrite").parquet("hdfs_path")

I need to apply same numPartitions value to each partition. But it is applying to numPartitions value to entire dataframe. For example: If it has 100 date partitions, i need to have 100 * 10 = 1000 part files. These code is not working as expected. I tried repartitionby("date") but this is causing huge data shuffle.

Can anyone please provide an optimized solution. Thanks!

1

There are 1 best solutions below

0
On BEST ANSWER

I am afraid that you can not skip shuffle in this case. All repartition/coalesce/partitionBy are working on dataset level and i dont think that there is a way to just split partitions into 10 without shuffle

You tried to use coalesce which is not causing shuffle and this is true, but coalesce can only be used to decrese number of partitions so its not going to help you

You can try to achieve what you want by using combination of raprtition and repartitionBy. Here is description of both functions (same applies to Scala source: https://sparkbyexamples.com:

PySpark repartition() is a DataFrame method that is used to increase or reduce the partitions in memory and when written to disk, it create all part files in a single directory.

PySpark partitionBy() is a method of DataFrameWriter class which is used to write the DataFrame to disk in partitions, one sub-directory for each unique value in partition columns.

If you first repartition your dataset with repartition = 1000 Spark is going to create 1000 partitions in memory. Later, when you call repartitionBy, Spark is going to create sub-directory forr each value and create one part file for each in-memory partition which contains given key

So if after repartition you have date X in 500 partitions out of 1000 you will find 500 file in sub-directory for this date

In article which i mentioned previously you can find simple example of this behaviourm, chech chapter 1.3 partitionBy(colNames : String*) Example

#Use repartition() and partitionBy() together
dfRepart.repartition(2)
        .write.option("header",True) \
        .partitionBy("state") \
        .mode("overwrite") \
        .csv("c:/tmp/zipcodes-state-more")