I am trying to join two dataframes which are read from S3 as parquet files. One of the dataframe is huge with size of 10GB(deserialized size) and the other one is about 1GB(deserialized size). I am just doing a left join based on two columns but the join is taking forever to complete. Below is the code snippet.
pk_populated_df = left_df.join(
right_df.select(
ID1,
ID2,
F.struct('*').alias(COLLECTION_DF)
),
on=[ID1, ID2],
how='left'
).persist()
pk_populated_df = pk_populated_df.join(
second_right_df.select(
ID1,
ID2,
F.struct('*').alias(COLLECTION_DF2)
),
on=[ID1, ID2],
how='left'
)
pk_populated_df.write.parquet("s3://")
The first join doesnt take much time. As the left_df has only small data which is of 10MB but the result is 12 Million Rows
Few of the things which is notice is that.
- The second join is the problem here which might produce billion rows.
- When i see the tasks output, i can see two tasks taking same input takes different times to complete the task. i.e., one task processing 7 MB of data takes 2 Mins, whereas other task processing almost same amount of data takes 8 Mins.
- I thought it might be due to data skewness, so i tried salting, still it did not work.
- Each task takes only few MB's of data from shuffle and all the tasks are processed as "PROCESS_LOCAL"
- I have tried increasing the shuffle partition upto 1000, but still each task runs for atleast 2 mins and some tasks might go upto 1 Hour.
- I have also notices that the CPU time is constantly hitting 100% and that might causing the tasks to complete slower which i am not sure.
- I have also tried repartitioning the data based on the join columns, but it did not help the cause.
Below is the time taken by tasks as a sample:
Can someone guide me on this to tune the job. Thanks for your help.