The size of checkpoint gets larger and larger without using state in tumblingProcessingTimeWindows

89 Views Asked by At

You can see that the size of checkpoint gets larger and larger, and never reduces.

checkpoints graph

In web UI, you can see that it is caused by TumblingProcessingTimeWindows, and I found that the size of checkpoint is nearly equal to the Bytes Received of TumblingProcessingTimeWindows. the job runs OK, and the logic of the process runs successfully.
enter image description here

However, in my apply function, I do not use state to save anything, so it is impossible to be caused by the content of state.

ret_stream = (log_stream
              .map(MyMapFunction())
              .filter(lambda x: self.get_key(x) is not None)
              .key_by(self.get_key, key_type=Types.STRING())
              .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
              .apply(MyWindowFunction()))

class MyWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]:
    def apply(self, key: KEY, window: W, inputs: Iterable[IN]) -> Iterable[OUT]:
        logger.info({
            "key": key,
            "data": [data for _, _, data in inputs]
        })
        return []

Another performance is that TaskManager Threads keeps growing.
TM threads

I do not understand why the checkpoint keeps growing. I set nowatermark and checkpoint interval is 1min.

2

There are 2 best solutions below

2
On

Windows do keep state. In particular, the events assigned to each window are kept in a list until the window is triggered, and these lists are checkpointed. Most sources and sinks also keep some state.

The amount of state kept by the windows is going to depend on the total throughput, which seems to be what you have observed.

If it's possible to rewrite your window function to do its work incrementally via reduce or aggregate (e.g., if you are computing a simple aggregation such as a count, sum, min, max, or average), then the size of the state and checkpoints could shrink dramatically.

2
On

i solve this problem by setting trigger function that i rewrite.

class MyProcessingTimeTrigger(ProcessingTimeTrigger):
    def on_processing_time(self,
                           time: int,
                           window: TimeWindow,
                           ctx: 'Trigger.TriggerContext') -> TriggerResult:
        return TriggerResult.FIRE_AND_PURGE

ret_stream = (log_stream
                      .map(MyMapFunction())
                      .filter(lambda x: self.get_key(x) is not None)
                      .key_by(self.get_key, key_type=Types.STRING())
                      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                      .trigger(MyProcessingTimeTrigger())
                      .apply(MyWindowFunction()))

but i still not understand why input data not clean after window function close