How to specify the time for which the state should be maintained in Flink CEP

78 Views Asked by At

Let me explain a scenario which I need to handle. Let us assume three devices A,B,C are sending logs to the flink CEP for processing. Let us assume the pattern as A followed by 5 mins B which is followed by C in 5 mins. Let us assume a scenario where B device is down and send the logs after 50 mins. So in this case all events will be dropped. I'm just curious to know if there is any support in flink to maintain the state upto a particular defined interval (lets say in my case 1 day which means both A and C logs will be stored for 1 day post which the logs will be dropped in case of no match).Kindly suggest the feasibility in the CEP point of view.

1

There are 1 best solutions below

2
On

There is not anything more specific than until or within as far as I know, but those are used to specify t. This depends on your exact setup, but if You take all data into a single topic it may be hard to guard against devices being down for longer periods of time. You can try modifying the watermark generation logic, but this means that it will delay output in general.

In this case, You may consider using ProcessFunction with custom logic which is more flexible and will allow You to handle state with better granularity.

EDIT:

So, basically You would need to create a state to hold partial matches, depending on the case it could be a ListState or ValueState and then simply put there any partial matches that you find. So, if You want A -> B -> C, then if You have A you would check and put it into the state, then if you receive B, you can check timestamps and append it to the state and finally if You have C you can emit the whole match and clear the state.

If You set stateTTL there, this would just assert that the state will be automatically cleaned after it's not read/written for some time.

Please also note that this makes sense if the patterns are not very complex otherwise it can quickly become a nightmare to code the logic.