I have a Delta Table which I am reading as StreamingQuery.
Looking through the Delta Table History, using DESCRIBE History
, I am seeing that 99% of the OperationMetrics states that numTargetRowsUpdates is 0
with most operations being Inserts. However, there are like 2-3 occasionally which have numTargetRowsUpdates > 1. The Operation on the Delta Table However is a Merge.
Can I still use StreamingQuery and read this data as a stream or would I get errors?. i.e.:
df: DataFrame = spark \
.readStream \
.format("delta") \
.load(f"{table_location}") \
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f "{checkpoint}/{table_location}")\
.trigger(once=True) \
.foreachBatch(process_batch) \
.start()
Now I have another Delta Table which is more of a dimension table of Customer information, i.e. email, name, last seen, etc.
I was initially reading this as a StreamingQuery as append, but I am getting the following error: java.lang.UnsupportedOperationException: Detected a data update
Looking through this table, within the Describe History, I see there are a number of Updates happening. Question: If I use StreamQuery with IgnoreChanges, True
, will this send the updated records as new records, which I can process further in the foreachBatch?
If there are updates or deletes in your delta source the read stream will throw an exception. This is also clear from databricks documentation:
If you use
IgnoreChanges, True
it will not throw an exception but it will give you the updated rows + rows which could have already been processed. This is because everything in the delta table happens on file level. For example, if you update a single row in a file (roughly) the following will happen:This is also mentioned in the docs.
You'll have to decide if this is ok for your use case. If you need to specifically handle updates and deletes databricks offers Change Data Feed, which you can enable on delta tables. This gives you row level details about inserts, appends and deletes (at the cost of some extra storage and IO).