PySpark broadcastjoin

41 Views Asked by At

I have a small PySpark dataframe with 12.000 rows. There is an attribute in the dataframe which is responsible for the hashed customer ID (named "hash_cus_id"). There may be duplicates in the attribute.

I also have a big PySpark dataframe with 700.000.000 rows of unique not hashed customer ID (named "cus_id"). I wanna create a "hash_cus_id" in big dataframe and join with small one.

I have found out about broadcastjoin and try two use it during merging, but i think it should work faster.

from pyspark.sql import Window

small_df = spark.table('smalldataframe')
big_df   = spark.table('bigdataframe')

big_df   = big_df.filter(big_df.report_dt=='9999-12-31')\ #it's partition filter
                         .withColumn('hash_cus_id', hash_cus(col('cus_id)))\ #it's hash function
                         .withColumn('part_num', ntile(100).over(Window.orderBy(col('cus_id')))) #I decided to create a column for new partitions

big_table = big_df.select(big_df.cus_id, big_df.hash_cus_id, big_df.part_num).repartition(100, 'part_num')

result_df = big_table.join(small_df.hint('broadcast'), on=[big_table.hash_cus_id == small_df.hash_cus_id], how='inner')

if I should to specify the table structures and give an example, let me know, I will correct my question

0

There are 0 best solutions below