I'm very new to PySpark and been having a challenge with partitioning data.
I have 2 datasets:
- Ad data set (very big) with ad_id and some attribute columns
- Ad transactions data set (smaller), includes ad_id and transaction date
It appears to me that i can only partition by ad_id, my question is: how can i evenly distribute data by the ranges of ad_id for both data set, so that when i need to compute a join between the 2 sets, it'll be faster?
here is what i'm trying to do:
ads.write.partitionBy("ad_id").mode('overwrite').parquet(os.path.join(output_data, 'ads_table'))
Thanks!
Use Bucketing
If you are using spark v2.3 and greater, you can use bucketing to avoid the shuffle that takes place on the join after the write.
With bucketing you can put your data into buckets based on a column (usually the one you are joining on). Then when spark reads the data from the buckets again, you will not need to perform an exchange.
1. Sample Data
Transactions (Fact)
Names (Dimension)
2. Disable Broadcast Join
Since one table is large and the other is small, you will need to disable
broadcastJoin
.3. Without Bucketing
As you can see there is an exchange that takes place due to the unbucketed join.
4. With Bucketing
As can be seen by the plan above, there are no more exchanges that take place. Thus, you will improve your performance, by avoiding the exchange.