Resolve Flink SQL windows tumble performance drop

105 Views Asked by At

I have a flink sql application which group the data from Kafka into database in real-time. The data are aggregated with interval: [1, 5, 10, 30, 60min, daily] and write to the corresponding DB table.

I use tumble function with watermark to aggregate data e.g. GROUP BY TUMBLE(ts, INTERVAL '5' MINUTES) with following configs:

table.exec.state.ttl = 24 h
table.exec.sink.not-null-enforcer = DROP
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 500 ms
table.optimizer.agg-phase-strategy= TWO_PHASE
table.exec.source.idle-timeout = 5000 ms

I ran the application on AWS KDA with 4 parallelism and 2 parallelism per KPU (2 KPU in total). The kafka topic is in 4 partition.

The application run as expected for the first 1.5hr, then data delay increase and some result of grouping is missing, this situation repeat everyday with similar amount of data and frequence.

From the metrics, I found that the trend of flink input messages not match with the input message to the Kafka after running 1.5hr (at 11:00)

flink input vs kafka input flink input vs kafka input

flink input vs task resources flink input vs task resources

task status when output normally task status when output normally

Task:
1) Input from kafka
2) tumble into 1 min
3) Left join extra info to the 1 min result
4) tumble into 1, 5 min
5) Group and write to DB into 1,5,10,15,30,60 min

task status when output unexpected enter image description here

I made change of following and the situation still occurs.

  1. Change a better instance type of database
  2. Change a better instance type of Kafka cluster
  3. Set 4 parallelism and 1 parallelism per KPU(4 KPU in total)

I would like to know the possilbe reason cause this situation and suggestion to resolve this issue, thank you.

1

There are 1 best solutions below

2
On

You should use the Windowing TVF instead of the deprecated Legacy Group Window Aggregations. This is optimized for better performance. You can find the details on the syntax and configuration in the docs [1].

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/