change data feed on Databricks

181 Views Asked by At

I would like to use the change data feed to curate from one layer (bronze) to anotehr layer (silver) incrementally. I came accross this nice feature in databricks where you enable change feed feature and you only read the latest changes that happened to that table delta.enableChangeDataFeed = true. These all good but this requries to pass the version or timestamps as mentioned here! but I would like to pass this as the parameter and not as the string. Always, latest version!

Since it has the _commit_version column there maybe I will include the where statement where _commit_version is max? will this work (need to still test it myself tho?

and what happens if my curation from bronze to silver fails today at (lets say) version 3. and tomorrow there is already new version 4. which means I need to currate both versions 3 and 4. In this case how can I manage this?

I want to use batch queries and not streaming queries. I guess streaming queries will support above mentioned problems by providing the checkpoint location when write.stream happens!

Any help apprecaited

1

There are 1 best solutions below

0
JayashankarGS On

To pass it as a parameter, you need to store it in a variable. Unfortunately, that is not possible since the max version you will get is in a table with a column like below.

SELECT MAX(version) AS ver FROM (DESCRIBE HISTORY student)

Output:

enter image description here

So, my idea is to use this query and filter it out in table_changes.

Code:

SELECT * FROM table_changes('student', 0) WHERE _commit_version = (SELECT MAX(version) AS ver FROM (DESCRIBE HISTORY student))

This gives the latest version records.

Output:

enter image description here

And use it further to curate from one layer to another layer.

Next, when it fails in between and a new version is updated, as you said, to handle such cases, you have the checkpoint option if it is in streaming mode. In batch mode, you need to keep a record of these changes and continue checking whenever you do your curate.

If versions are synced between one layer to another layer, you can just take the max version difference and perform your operation on the difference result you got.

Example: In the silver layer, the max version is 2, and the same with the gold layer. Today, you got version 3 in silver and started curation. Upon failing, the gold layer doesn't have version 3, and tomorrow, silver has a new version 4. Now, if you take the difference of max versions, you will get 2, and you need to start curation from 2 versions before, from silver to gold. Once again, I am stating this only if the versions are synced and not updating the gold layer separately.