I'm trying to perform a join between two tables in PySpark using the iceberg format. I'm trying to use bucketing to improve performance, and avoid a shuffle, but it appears to be having no effect whatsoever. What might I be missing?
Code for non-bucketed join:
db = "ml_recommendations"
col = "id"
df1 = spark.range(1, 10 ** 6) # This will create a DF with a single col "id"
df1.writeTo(f"{db}.no_bucket_table1") \
.using('iceberg') \
.option("fanout-enabled", "true") \
.createOrReplace()
df2 = spark.range(1, 10 ** 5) # This will create a DF with a single col "id"
df2.writeTo(f"{db}.no_bucket_table2") \
.using('iceberg') \
.option("fanout-enabled", "true") \
.createOrReplace()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df = spark.sql(
f"select * from {db}.no_bucket_table1 t1 "
f"inner join {db}.no_bucket_table2 t2 "
f"on t1.{col}=t2.{col}"
)
df.explain(extended=True)
Result of the non-bucketed join:
== Optimized Logical Plan ==
Join Inner, (id#8L = id#9L)
:- Filter isnotnull(id#8L)
: +- RelationV2[id#8L] Iceberg iceberg.ml_recommendations.no_bucket_table1
+- Filter isnotnull(id#9L)
+- RelationV2[id#9L] Iceberg iceberg.ml_recommendations.no_bucket_table2
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#8L], [id#9L], Inner
:- Sort [id#8L ASC NULLS FIRST], false, 0
: +- **Exchange hashpartitioning**(id#8L, 400), ENSURE_REQUIREMENTS, [plan_id=201]
: +- Filter isnotnull(id#8L)
: +- BatchScan[id#8L] iceberg.ml_recommendations.no_bucket_table1 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []
+- Sort [id#9L ASC NULLS FIRST], false, 0
+- **Exchange hashpartitioning**(id#9L, 400), ENSURE_REQUIREMENTS, [plan_id=202]
+- Filter isnotnull(id#9L)
+- BatchScan[id#9L] iceberg.ml_recommendations.no_bucket_table2 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []
Code for the bucketed join:
df1 = spark.range(1, 10 ** 6) # This will create a DF with a single col "id"
df1.writeTo(f"{db}.bucket_table1") \
.using('iceberg') \
.option("fanout-enabled", "true") \
.partitionedBy(pf.bucket(100, "id")) \
.createOrReplace()
df2 = spark.range(1, 10 ** 5) # This will create a DF with a single col "id"
df2.writeTo(f"{db}.bucket_table2") \
.using('iceberg') \
.option("fanout-enabled", "true") \
.partitionedBy(pf.bucket(100, "id")) \
.createOrReplace()
df = spark.sql(
f"select * from {db}.bucket_table1 t1 "
f"inner join {db}.bucket_table2 t2 "
f"on t1.{col}=t2.{col}"
)
df.explain(extended=True)
Result of the bucketed join
== Optimized Logical Plan ==
Join Inner, (id#43L = id#44L)
:- Filter isnotnull(id#43L)
: +- RelationV2[id#43L] Iceberg iceberg.ml_recommendations.bucket_table1
+- Filter isnotnull(id#44L)
+- RelationV2[id#44L] Iceberg iceberg.ml_recommendations.bucket_table2
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#43L], [id#44L], Inner
:- Sort [id#43L ASC NULLS FIRST], false, 0
: +- **Exchange hashpartitioning**(id#43L, 400), ENSURE_REQUIREMENTS, [plan_id=422]
: +- Filter isnotnull(id#43L)
: +- BatchScan[id#43L] iceberg.ml_recommendations.bucket_table1 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []
+- Sort [id#44L ASC NULLS FIRST], false, 0
+- **Exchange hashpartitioning**(id#44L, 400), ENSURE_REQUIREMENTS, [plan_id=423]
+- Filter isnotnull(id#44L)
+- BatchScan[id#44L] iceberg.ml_recommendations.bucket_table2 (branch=null) [filters=id IS NOT NULL, groupedBy=] RuntimeFilters: []
So the question is - why is bucketing not removing the exchange step?