Different behaviour of cache method for PySpark dataframes in Spark 2.3

After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.


from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window 

sparkSession = (SparkSession
src_tbl = sparkSession.sql("SELECT * FROM src_tbl") 
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl") 
delta = src_tbl.subtract(dst_tbl)  # find the difference 

# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records) 
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number(). \                                        
  over(Window.partitionBy(join_field). \                                                    
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records    

# save final tables to DB
snpsht_tbl_name = 'snpsht'
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl  + 
                    SELECT * FROM snpsht_table_name_tmp""")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count()  # 0 records     

inc_tbl_name = 'inc'
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl  + 
                    SELECT * FROM inc_table_name_tmp""")

This is a minimal example to produce my problem.

And now, in Spark 2.1 inc_tbl has been just saved to the inc_tbl with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl table already has records from the current date so just inserting records that were there before processing.


