I have a DLT pipeline where I want to calculate the rolling average of a column for the last 24 hours which is updated every hour.
I'm using the below code to achieve this:
@dlt.table()
def gold():
df = dlt.read_stream("silver_table")
# Define window for 24 hours with 1-hour slide
window_spec_24h = window("fetch_ts", "24 hours", "1 hour")
df.withWatermark("fetch_ts", "10 minutes")\
.groupBy(df.Id, window_spec_24h)\
.agg(avg("foo").alias("average_foo_24h"))
return df
My issue is, I'm always missing the last window in my result df. For instance, if my input df has the following fetch_ts values:
2024-02-23T15:04:00.000
2024-02-23T16:04:00.000
2024-02-23T16:05:00.000
2024-02-23T16:54:00.000
2024-02-23T17:06:00.000
2024-02-23T18:54:00.000
the output df has the following windows:
{"start":"2024-02-22T16:00:00.000Z","end":"2024-02-23T16:00:00.000Z"}
{"start":"2024-02-22T17:00:00.000Z","end":"2024-02- 23T17:00:00.000Z"}
{"start":"2024-02-22T18:00:00.000Z","end":"2024-02-23T18:00:00.000Z"}
which means that my last row with the "2024-02-23T18:54:00.000"
fetch_ts is getting excluded in the calculation.
During the next trigger, the previously missing window appears, but this time the last window of the latest batch doesn't appear. It goes on like this.
Any idea why this is happening? Or is it by design and I'm missing something? Is there a way to add the last window {"start":"2024-02-22T19:00:00.000Z","end":"2024-02-23T19:00:00.000Z"}
as well so that I can include the last row in my calculation?
Thanks and regards,
(I tried removing the watermarking and then I can get the latest window as well but removing the watermarking is not an option as I want to join the calculated df with the original one to include the other columns. Stream-stream joins are not allowed without watermark.)
Under the hood, DLT is using Structured Streaming, so I'll explain the semantics of when windows are emitted.
In a streaming aggregation, records may arrive late and out of order. To deal with this, SS will buffer records in time windows. At the end of each batch, the stream's output mode determines which records will be emitted downstream. However, the output mode is hidden from you in DLT and not directly configurable.
The default is the Append output mode, which emits the aggregates that will not change in subsequent triggers. Which aggregates won't change? The aggregates that won't change are the ones that won't receive any more records; the windows that won't receive any more records are the ones whose end timestamp is less than the watermark. Since a watermark of 3pm says that the stream will no longer receive records before 3pm, the engine will close the windows that end before 3pm.
So, in your example, that last record of
2024-02-23T18:54:00.000
is actually being processed! That record causes the watermark to advance to 10 minutes before that,2024-02-23T18:44:00.00
, which is indeed greater than the end of the last window (the one ending at2024-02-23T18:00:00.000Z
) you're seeing.Then, when you push new records through, you cause the watermark to advance further, which causes the buffered records to be emitted. That's the answer to:
If you really wanted to see windows as they are created (and not after the watermark crosses their end), your best bet would be a MV in DLT Serverless.
Exercises
You really don't have to do these exercises, since in DLT you're intentionally shielded from output mode. But since this was asked in [spark-structured-streaming] I'm taking liberty to add them :)
To make these exercises easier to describe, let's just use seconds for our timestamps. We'll do a 10 second tumbling window (not sliding) with a watermark of 5 seconds.
We receive the records 5, 6, and 9. How many aggregates are emitted downstream?
We receive 11. What records are emitted?
What's the minimum timestamp you need to receive for exactly one window to be emitted?
Finally, let's develop an interesting comment you made: "It goes on like this." Is there ever a situation in which, using a non-zero watermark delay
d
and a windowed streaming aggregation, it won't "go on like this?" That is, can you ever get Structured Streaming to emit all windows from intermediary state?