Bucketed joins in PySpark/Iceberg

327 Views Asked by At

I'm trying to perform a join between two tables in PySpark using the iceberg format. I'm trying to use bucketing to improve performance, and avoid a shuffle, but it appears to be having no effect whatsoever. What might I be missing?

Code for non-bucketed join:

db = "ml_recommendations" 
col = "id" 

df1 = spark.range(1, 10 ** 6)  # This will create a DF with a single col "id"
df1.writeTo(f"{db}.no_bucket_table1") \ 
    .using('iceberg') \
    .option("fanout-enabled", "true") \
    .createOrReplace() 

df2 = spark.range(1, 10 ** 5)  # This will create a DF with a single col "id"
df2.writeTo(f"{db}.no_bucket_table2") \ 
    .using('iceberg') \ 
    .option("fanout-enabled", "true") \ 
    .createOrReplace() 

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 

df = spark.sql( 
    f"select * from {db}.no_bucket_table1 t1 " 
    f"inner join {db}.no_bucket_table2 t2 " 
    f"on t1.{col}=t2.{col}" 
) 
df.explain(extended=True)

Result of the non-bucketed join:

== Optimized Logical Plan ==
Join Inner, (id#8L = id#9L)
:- Filter isnotnull(id#8L)
:  +- RelationV2[id#8L] Iceberg iceberg.ml_recommendations.no_bucket_table1
+- Filter isnotnull(id#9L)
   +- RelationV2[id#9L] Iceberg iceberg.ml_recommendations.no_bucket_table2
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#8L], [id#9L], Inner
   :- Sort [id#8L ASC NULLS FIRST], false, 0
   :  +- **Exchange hashpartitioning**(id#8L, 400), ENSURE_REQUIREMENTS, [plan_id=201]
   :     +- Filter isnotnull(id#8L)
   :        +- BatchScan[id#8L] iceberg.ml_recommendations.no_bucket_table1 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []
   +- Sort [id#9L ASC NULLS FIRST], false, 0
      +- **Exchange hashpartitioning**(id#9L, 400), ENSURE_REQUIREMENTS, [plan_id=202]
         +- Filter isnotnull(id#9L)
            +- BatchScan[id#9L] iceberg.ml_recommendations.no_bucket_table2 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []

Code for the bucketed join:

df1 = spark.range(1, 10 ** 6)  # This will create a DF with a single col "id"
df1.writeTo(f"{db}.bucket_table1") \ 
    .using('iceberg') \ 
    .option("fanout-enabled", "true") \ 
    .partitionedBy(pf.bucket(100, "id")) \ 
    .createOrReplace() 

df2 = spark.range(1, 10 ** 5)  # This will create a DF with a single col "id"
df2.writeTo(f"{db}.bucket_table2") \ 
    .using('iceberg') \ 
    .option("fanout-enabled", "true") \ 
    .partitionedBy(pf.bucket(100, "id")) \ 
    .createOrReplace() 

df = spark.sql( 
    f"select * from {db}.bucket_table1 t1 " 
    f"inner join {db}.bucket_table2 t2 " 
    f"on t1.{col}=t2.{col}" 
) 
df.explain(extended=True)

Result of the bucketed join

== Optimized Logical Plan ==
Join Inner, (id#43L = id#44L)
:- Filter isnotnull(id#43L)
:  +- RelationV2[id#43L] Iceberg iceberg.ml_recommendations.bucket_table1
+- Filter isnotnull(id#44L)
   +- RelationV2[id#44L] Iceberg iceberg.ml_recommendations.bucket_table2
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#43L], [id#44L], Inner
   :- Sort [id#43L ASC NULLS FIRST], false, 0
   :  +- **Exchange hashpartitioning**(id#43L, 400), ENSURE_REQUIREMENTS, [plan_id=422]
   :     +- Filter isnotnull(id#43L)
   :        +- BatchScan[id#43L] iceberg.ml_recommendations.bucket_table1 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []
   +- Sort [id#44L ASC NULLS FIRST], false, 0
      +- **Exchange hashpartitioning**(id#44L, 400), ENSURE_REQUIREMENTS, [plan_id=423]
         +- Filter isnotnull(id#44L)
            +- BatchScan[id#44L] iceberg.ml_recommendations.bucket_table2 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []

So the question is - why is bucketing not removing the exchange step?

0

There are 0 best solutions below