When investigating the performance of a spark job I noticed in the Spark UI SQL DAG view that a SortMergeJoin was being performed instead of the expected BroadcastHashJoin.
The code looks something like this:
val df1 = // Some DataFrame
val df2 = // Some DataFrame
def df2Selected =
df2
.select(...)
df1
.join(
broadcast(df2Selected),
// join clause,
"left_outer"
)
However - if I modify df2Selected to add coalesce(1) and re-run then I see the expected BroadcastHashJoin.
def df2Selected =
df2
.select(...)
.coalesce(1)
Can anyone tell me what is going on here and if I can force the broadcast without coalescing?
My bet is that coalesce somehow allows Spark to reason about the size of the DataFrame returned from df2Selected and therefore Spark will allow the broadcast.
Broadcasthashjoin does not work with left outer and full outer join. It's for good reasons, in case of 2 executors, lets say there is no match for 1 row in the smaller table, so should it be in the results with nulls on the right side? It would not know on Executor1 whether there is a matching pair on Executor2, it cannot decide locally.