Modifying State in Flink's Aggregate Function within a Windowed Operation

40 Views Asked by At

I'm working on a Flink application where I'm using an aggregate function over a window. I've been able to successfully read the state of its outputs using the provided APIs.

However, in my specific use case, I also need to be able to stop the processing, retrieve and modify the state, and then resume processing without resetting checkpoints. Unfortunately, the documentation doesn't provide any examples for such use cases.

This is how the aggregator and window function is used:

eventStream
    .keyBy(_.key)
    .window(TumblingTimeWindow.of(5.minutes))
    .aggregate(new Aggregator, new PostAggregatorWindowFunction)
    .uid("aggregate-every-5-minutes")

Then I run separately:

val readerStream = SavepointReader.read(env, checkpointFile, backend)
    .window(TumblingTimeWindow.of(5.minutes))
    .aggregate(
    "aggregate-every-5-minutes",
    new Aggregator,
    new WindowStateReader // Extends WindowReaderFunction and outputs (KEY, CurrentAggregate)
  )

val transformation = OperatorTransformation
    .bootstrapWith(readerStream)
    .keyBy(_._1) // Keying by first entry of tuple (KEY, CurrentAggregate)
    .window(TumblingTimeWindow.of(5.minutes))
    .aggregate(???, ???) // How do we define these? And how do we set their state?

SavepointWriter.fromExistingSavepoint(env, checkpointFile, backend)
    .removeOperator(OperatorIdentifier.forUid("aggregate-every-5-minutes"))
    .addOperator(OperatorIdentifier.forUid("aggregate-every-5-minutes", transformation))
    .write(outputFile)

Could someone please guide me on how I can achieve this in Flink 1.17? Any code snippets or pointers to relevant resources would be greatly appreciated. Thank you!

0

There are 0 best solutions below