Broadcast left table in a join

4.7k Views Asked by At

This is my join:

df = df_small.join(df_big, 'id', 'leftanti')

It seems I can only broadcast the right dataframe. But in order for my logic to work (leftanti join), I must have my df_small on the left side.

How do I broadcast a dataframe which is on left?


Example:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()

df_small = spark.range(2)
df_big = spark.range(1, 5000000)

#    df_small     df_big
#    +---+        +-------+
#    | id|        |     id|
#    +---+        +-------+
#    |  0|        |      1|
#    |  1|        |      2|
#    +---+        |    ...|
#                 |4999999|
#                 +-------+

df_small = F.broadcast(df_small)
df = df_small.join(df_big, 'id', 'leftanti')
df.show()
df.explain()

#    +---+
#    | id|
#    +---+
#    |  0|
#    +---+
#
#    == Physical Plan ==
#    AdaptiveSparkPlan isFinalPlan=false
#    +- SortMergeJoin [id#197L], [id#199L], LeftAnti
#       :- Sort [id#197L ASC NULLS FIRST], false, 0
#       :  +- Exchange hashpartitioning(id#197L, 200), ENSURE_REQUIREMENTS, [id=#1406]
#       :     +- Range (0, 2, step=1, splits=2)
#       +- Sort [id#199L ASC NULLS FIRST], false, 0
#          +- Exchange hashpartitioning(id#199L, 200), ENSURE_REQUIREMENTS, [id=#1407]
#             +- Range (1, 5000000, step=1, splits=2)
2

There are 2 best solutions below

0
On

Unfortunately it's not possible.

Spark can broadcast left side table only for right outer join.

You can get desired result by dividing left anti into 2 joins i.e. inner join and left join.

df1 = spark.createDataFrame([1, 2, 3, 4, 5], IntegerType())
df2 = spark.createDataFrame([(1, 'a'), (2, 'b')], ['value', 'col'])
inner = df1.join(broadcast(df2), 'value', 'inner')
out = df1.join(broadcast(inner), 'value', 'left').where(col('col').isNull()).drop('col')
out.show()
+-----+
|value|
+-----+
|    3|
|    4|
|    5|
+-----+

df1.join(df2, 'value', 'left_anti').show()
+-----+
|value|
+-----+
|    5|
|    3|
|    4|
+-----+
0
On

Based on this idea I created a function.

No function:

df_inner = df_big.join(broadcast(df_small), 'id', 'inner').select('id').distinct()
df_out = df_small.join(broadcast(df_inner), 'id', 'leftanti')

With the function:

def leftanti_on_small(df_small, df_big, on):
    df_inner = df_big.join(broadcast(df_small), on, 'inner').select(on).distinct()
    return df_small.join(broadcast(df_inner), on, 'leftanti')

df_out = leftanti_on_small(df_small, df_big, on='id')

Result:

df_out.show()
df_out.explain()

#  +---+
#  | id|
#  +---+
#  |  0|
#  +---+
#  
#  == Physical Plan ==
#  AdaptiveSparkPlan isFinalPlan=false
#  +- BroadcastHashJoin [id#209L], [id#211L], LeftAnti, BuildRight, false
#     :- Range (0, 2, step=1, splits=2)
#     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1587]
#        +- HashAggregate(keys=[id#211L], functions=[])
#           +- HashAggregate(keys=[id#211L], functions=[])
#              +- Project [id#211L]
#                 +- BroadcastHashJoin [id#211L], [id#217L], Inner, BuildRight, false
#                    :- Range (1, 5000000, step=1, splits=2)
#                    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#1581]
#                       +- Range (0, 2, step=1, splits=2)