How can I merge an incremental dataset and a snapshot dataset while retaining deleted rows?

575 Views Asked by At

I have a data connection source that creates two datasets:

  • Dataset X (Snapshot)
  • Dataset Y (Incremental)

The two datasets pull from the same source. Dataset X consists of the current state of all rows in the source table. Dataset Y pulls all rows that have been updated since the last build. These two datasets are then merged downstream into dataset Z with dataset Z being either dataset X or the most recent version of each row from dataset Y. This allows us to both have low latency updates and maintain good partitioning.

When rows are deleted in the source table, the rows are no longer present in dataset X but are still present in dataset Y.

What would be the best way keep these 'deleted' rows in dataset Z? Ideally I would also be able to snapshot dataset Y without losing any of the 'deleted' rows.

1

There are 1 best solutions below

0
On BEST ANSWER

Good question! From what I understand, you want dataset Z to have only the most up-to-date rows, including the most up-to-date deleted rows. Both updated rows and deleted rows are present in Y. In this case, I would suggest first unioning Y and X together, so that all rows, including deleted rows are present in the union dataset. Then, use a window function over a date column in order to get the most recent version of each row. Here is an outline of the pyspark code I would suggest for this:

from pyspark.sql import Window
import pyspark.sql.functions as F

window = Window.partitionBy(primary_keys).orderBy(F.col(date_column).desc())
Z = X.unionByName(Y) # union to get all columns, including deleted
Z = Z.withColumn("row_num", F.row_number().over(window)) # rank by date created/updated
Z = Z.filter(F.col("row_num") == 1).drop("row_num") # keep only the latest version of each row

Note that this solution does not get around the issue of what happens if Y snapshots.