Incremental transformation using Delta live tables

49 Views Asked by At

I'm attempting to build an incremental data processing pipeline using delta live tables. The aim to stream data from a source multiple times in a day and join the data within the specific increment only.

I'm using autoloader to load the data incrementally from source.

I've taken a sample of 2 tables which I'm attempting to join with each other and produce a gold table. When I run the pipeline for the first time, the tables are joined correctly and the count of records in the gold table is correct.

In the next step, I attempt to load 2 records each in the input tables and reran the entire pipeline. I was expecting the final table row count to be incremented by 2. However I found that the number was > 2. On closer inspection, I realized that even though only 2 records were streamed in for each of the input tables, the join occurred with the entire table.
Is there a way I can have only the delta of each file be joined with each other rather than have the join occur over the entire table?

The joining step looks like this.

def load_table_to_gold(sqlstmt,temp,outtable):  
    @dlt.table(name=f"GOLD_{outtable}",temporary=temp, comment=f"This is my gold table GOLD_{outtable}")
    def gold_cold():
        golddf = spark.sql("SELECT c.customer_id, c.name, c.mobile_number, o.order_id, o.total_amount, c.ingested_date FROM STREAM LIVE.SILVER_Cust AS c INNER JOIN STREAM LIVE.SILVER_Orders AS o ON c.customer_id = o.customer_id")
        return golddf

As you can see from the pic, it shows that the 2 incoming tables streamed in 2 records each, however, the join that occurred spat out 19 records into the Gold table.enter image description here

0

There are 0 best solutions below