Anti join followed by union in Spark SQL

686 Views Asked by At

I am running PySpark script in which I am doing anti join & union of 2 dataframes. But I want to do it in Spark SQL.

df_src:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    100|     QC|
|    105|     XY|
|    110|     NM|
|    115|     AB|
+-------+-------+

df_lkp:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    100|     QC|
|    105|     XY|
|    106|     XZ|
+-------+-------+

We have two dataframes: df_src & df_lkp. I am extracting unmatched records from df_src:

df_unmatched = df_src.join(df_lkp, on=column_nm, how='left_anti')

It is giving this result:

df_unmatched

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    110|     NM|
|    115|     AB|
+-------+-------+

But I want to do this part using Spark SQL. I have created temporary view vw_df_src & vw_df_lkp and trying to run the following query, but not getting the result.

unmatched_sql = "SELECT * from vw_df_src where {0} in (select {0} from vw_df_src minus select {0} from vw_df_lkp)".format('call_id')
df_unmatched = sqlContext.sql(unmatched_sql)

I am also doing union of both the dataframes and dropping duplicates. I am using below code:

df_src1 = df_lkp.union(df_src)
df_src1.show(10)
df_src2 = df_src1.dropDuplicates(['call_id'])

df_src2:

+-------+-------+
|call_id|call_nm|
+-------+-------+
|    110|     NM|
|    100|     QC|
|    115|     AB|
|    106|     XZ|
|    105|     XY|
+-------+-------+

I want this to be done in Spark SQL too.

I am using the following code to create temp views:

df_src = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(src_file_nm)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(lkp_file)
df_lkp.createOrReplaceTempView('vw_df_lkp')
2

There are 2 best solutions below

0
On BEST ANSWER

ANTI JOIN

spark.sql(
"""select * from vw_df_src LEFT ANTI JOIN 

vw_df_lkp  ON

vw_df_src.call_nm= vw_df_lkp.call_nm """).show()


+-------+-------+
|call_id|call_nm|
+-------+-------+
|    115|     AB|
|    110|     NM|
+-------+-------+

If running in a notebook cell not initialed as sql TRY

%sql 
select * from vw_df_src LEFT ANTI JOIN 

vw_df_lkp  ON

vw_df_src.call_nm= vw_df_lkp.call_nm 

UNION

In pyspark, union returns duplicates and you have to drop_duplicates() or use distinct(). In sql, union eliminates duplicates. The following will therefore do. Spark 2.0.0 unionall() retuned duplicates and union is the thing

spark.sql(
"""select * from vw_df_src

union

select * from vw_df_lkp""" ).show()
0
On

Preset:

df_src = spark.createDataFrame(
    [(100, 'QC'),
     (105, 'XY'),
     (110, 'NM'),
     (115, 'AB')],
    ['call_id', 'call_nm']
)
df_lkp = spark.createDataFrame(
    [(100, 'QC'),
     (105, 'XY'),
     (105, 'XY'),
     (106, 'XZ')],
    ['call_id', 'call_nm']
)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp.createOrReplaceTempView('vw_df_lkp')

According to your requirements, (anti join + union) can be done like this:

spark.sql(
    """
    select *
    from vw_df_src as a
    anti join vw_df_lkp b on a.call_nm=b.call_nm
    union (select * from vw_df_lkp)
    """
).show()
# +-------+-------+
# |call_id|call_nm|
# +-------+-------+
# |    110|     NM|
# |    115|     AB|
# |    100|     QC|
# |    105|     XY|
# |    106|     XZ|
# +-------+-------+

However, it seems that anti join is not needed:

spark.sql(
    """
    select * from vw_df_src
    union
    select * from vw_df_lkp
    """
).show()
# +-------+-------+
# |call_id|call_nm|
# +-------+-------+
# |    100|     QC|
# |    105|     XY|
# |    115|     AB|
# |    110|     NM|
# |    106|     XZ|
# +-------+-------+