How to inject delay between the window and sink operator?

307 Views Asked by At

Context - Application

We have an Apache Flink application which processes events

  • The application uses event time characteristics
  • The application shards (keyBy) events based on the sessionId field
  • The application has windowing with 1 minute tumbling window
    • The windowing is specified by a reduce and a process functions
    • So, for each session we will have 1 computed record
  • The application emits the data into a Postgres sink

Context - Infrastructure

Application:

  • It is hosted in AWS via Kinesis Data Analytics (KDA)
  • It is running in 5 different regions
  • The exact same code is running in each region

Database:

  • It is hosted in AWS via RDS (currently it is a PostgreSQL)
  • It is located in one region (with a read replica in a different region)

Problem

Because we are using event time characteristics with 1 minute tumbling window all regions' sink emit their records nearly at the same time.

Current State

What we want to achieve is to add artificial delay between window and sink operators to postpone sink emition.

Desired State

Flink App Offset Window 1 Sink 1st run Window 2 Sink 2nd run
#1 0 60 60 120 120
#2 12 60 72 120 132
#3 24 60 84 120 144
#4 36 60 96 120 156
#5 48 60 108 120 168

Not working work-around

We have thought that we can add some sleep to evictor's evictBefore like this

...
.keyBy(event -> event.getSessionId())
.window(getWindowAssigner(config))
.allowedLateness(Time.seconds(config.getWindowLatenessInSec()))
.evictor(new Evictor<>() {
    private static final long serialVersionUID = 5373966807521260856L;

    public void evictBefore(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
        try {
            Thread.sleep(config.getWindowingDelayInMilliSec());
        } catch (InterruptedException ignore) {
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Event>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {

    }
})
...

but it does not work reliably.

1

There are 1 best solutions below

3
On BEST ANSWER

You could use TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) with WindowStagger.RANDOM.

See https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html for documentation.