HashPartioning dataframes to achieve co-partitioning during join in PySpark

146 Views Asked by At

I am trying to figure out the best way to achieve co-partitioning on my two datasets to eliminate join related shuffles. I'm working with 2 dataframes A and B where A contains minimal user date including a field for event IDs they interacted with, and B contains detailed information about the events. I am trying to join on 3 fields: day, event_type, and event_id. A and B need to be read from disk as they will be written to and read from by external clients on an ongoing basis.

The main goal of the project I'm working on is to enable the ability to quickly:

  1. Filter by event_type
  2. Join raw event details to user IDs

I understand that in order to achieve #1 I probably need to partition my parquet files on event_type so that the directory structure achieves easier filtering. In order to achieve #2 I should try to minimize shuffles as much as possible by means of co-partitioning keys from the two dataframes.

The data I'm working with consists of 3 days of event data (~12M rows per event type) and the goal is to get this working efficiently for 1-3 years of data.

In order to improve my join I first begin by filtering on the event_type I am interested in to narrow down the data on both dataframes. I then do the actual join on day and event_id. This naturally will result in shuffles since there is no co-partitioning so I've tried to address that using hash partitioning.

I read that repartition implements hash partitioning on the specified columns. I save my dataframes to disk and also include a partitionBy('day', 'event_type') in order to achieve better performance on filtering/grouping operations.

A\
  .repartition('day', 'event_id')\
  .write
  .partitionBy('day', 'event_type')\
  .mode('overwrite')\
  .parquet('/path/to/A')

B\
  .repartition('day', 'event_id')\
  .write\
  .partitionBy('day', 'event_type')\
  .mode('overwrite')\
  .parquet('/path/to/B')
...
...
A = spark.read.parquet('/path/to/A')
B = spark.read.parquet('/path/to/B')
A.filter(col('event_type') == 'X')\
  .join(B.filter(col('event_type) == 'X'), on=['day', event_id'], how='inner')\
  .show()

When I execute this I still see a shuffle exchange in the plan as well as shuffle writes which take up around 5-10GB each. I also see longer executor compute times of around 21-41s which might not seem much on 3 days of data but might blow up for yearly data.

I am wondering what's a better way I can go about doing this - or if it is even possible to eliminate shuffles when working with dataframes? Answers to this question seem to suggest that it might be possible but not a great idea?

I am not even sure that doing both a repartition and a partitionBy is the correct approach. Is the initial partitioning using repartition() preserved at all when I re-read the parquet files from disk? I have read that this might not be the case - overall the information available seems either conflicting or without explicit sources attached.

Thank you for taking the time to help.

0

There are 0 best solutions below