I'm trying to use Apache Beam (via Scio) to run a continuous aggregation of the last 3 days of data (processing time) from a streaming source and output results from the earliest, active window every 5 minutes. Earliest meaning the window with the earliest start time, active meaning that the end of the window hasn't yet passed. Essentially I'm trying to get a 'rolling' aggregation by dropping the non-overlapping period between sliding windows.
A visualization of what I'm trying to accomplish with an example sliding window of size 3 days and period 1 day:
early firing - ^ no firing - x
|
** stop firing from this window once time passes this point
^ ^ ^ ^ ^ ^ ^ ^
| | | | | | | | ** stop firing from this window once time passes this point
w1: +====================+^ ^ ^
x x x x x x x | | |
w2: +====================+^ ^ ^
x x x x x x x | | |
w3: +====================+
time: ----d1-----d2-----d3-----d4-----d5-----d6-----d7---->
I've tried using sliding windows (size=3 days, period=5 min), but they produce a new window for every 3 days/5 min combination in the future and are emitting early results for every window. I tried using trigger = AfterWatermark.pastEndOfWindow(), but I need early results when the job first starts. I've tried comparing the pane data (isLast, timestamp, etc.) between windows but they seem identical.
My most recent attempt, which seems somewhat of a hack, included attaching window information to each key in a DoFn, re-windowing into a fixed window, and attempting to group and reduce to the oldest window from the attached data, but the final reduceByKey doesn't seem to output anything.
DoFn to attach window information
// ValueType is just a case class I'm using for objects
type DoFnT = DoFn[KV[String, ValueType], KV[String, (ValueType, Instant)]]
class Test extends DoFnT {
// Window.toString looks like the following:
// [2020-05-16T23:57:00.000Z..2020-05-17T00:02:00.000Z)
def parseWindow(window: String): Instant = {
Instant.parse(
window
.stripPrefix("[")
.stripSuffix(")")
.split("\\.\\.")(1))
}
@ProcessElement
def process(
context: DoFnT#ProcessContext,
window: BoundedWindow): Unit = {
context.output(
KV.of(
context.element().getKey,
(context.element().getValue, parseWindow(window.toString))
)
)
}
}
sc
.pubsubSubscription(...)
.keyBy(_.key)
.withSlidingWindows(
size = Duration.standardDays(3),
period = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
allowedLateness = Duration.ZERO,
trigger = Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))))))
.reduceByKey(ValueType.combineFunction())
.applyPerKeyDoFn(new Test())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO))
.reduceByKey((x, y) => if (x._2.isBefore(y._2)) x else y)
.saveAsCustomOutput(
TextIO.write()...
)
Any suggestions?
First, regarding processing time: If you want to window according to processing time, you should set your event time to the processing time. This is perfectly fine - it means that the event you are processing is the event of ingesting the record, not the event that the record represents.
Now you can use sliding windows off-the-shelf to get the aggregation you want, grouped the way you want.
But you are correct that it is a bit of a headache to trigger the way you want. Triggers are not easily expressive enough to say "output the last 3 day aggregation but only begin when the window is 5 minutes from over" and even less able to express "for the first 3 day period from pipeline startup, output the whole time".
I believe a stateful
ParDo(DoFn)will be your best choice. State is partitioned per key and window. Since you want to have interactions across 3 day aggregations you will need to run yourDoFnin the global window and manage the partitioning of the aggregations yourself. You tagged your questiongoogle-cloud-dataflowand Dataflow does not supportMapStateso you will need to use aValueStatethat holds a map of the active 3 day aggregations, starting new aggregations as needed and removing old ones when they are done. Separately, you can easily track the aggregation from which you want to periodically output, and have a timer callback that periodically emits the active aggregation. Something like the following pseudo-Java; you can translate to Scala and insert your own types:I do have some reservations about this, because the outputs we are working so hard to suppress are not comparable to the outputs that are "replacing" them. I would be interesting in learning more about the use case. It is possible there is a more straightforward way to express the result you are interested in.