You can see that the size of checkpoint gets larger and larger, and never reduces.
In web UI, you can see that it is caused by TumblingProcessingTimeWindows, and I found that the size of checkpoint is nearly equal to the Bytes Received of TumblingProcessingTimeWindows. the job runs OK, and the logic of the process runs successfully.
However, in my apply function, I do not use state to save anything, so it is impossible to be caused by the content of state.
ret_stream = (log_stream
.map(MyMapFunction())
.filter(lambda x: self.get_key(x) is not None)
.key_by(self.get_key, key_type=Types.STRING())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(MyWindowFunction()))
class MyWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]:
def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
logger.info({
"key": key,
"data": [data for _, _, data in inputs]
})
return []
Another performance is that TaskManager Threads keeps growing.
I do not understand why the checkpoint keeps growing. I set nowatermark and checkpoint interval is 1min.
Windows do keep state. In particular, the events assigned to each window are kept in a list until the window is triggered, and these lists are checkpointed. Most sources and sinks also keep some state.
The amount of state kept by the windows is going to depend on the total throughput, which seems to be what you have observed.
If it's possible to rewrite your window function to do its work incrementally via
reduce
oraggregate
(e.g., if you are computing a simple aggregation such as a count, sum, min, max, or average), then the size of the state and checkpoints could shrink dramatically.