Apache Flink : filtering based on previous value

482 Views Asked by At

All filtering examples in apache flink documentation display simple cases of filtering according to a global threshold.

But what if filtering on an entry should take into account the previous entry?

Let's say we have a stream of sensor data. We need to discard the current sensor data entry if it's X% larger than then previous entry.

Is there a simple solution for this? Either in Apache Flink or in plain Java.

Thanks

2

There are 2 best solutions below

1
On BEST ANSWER

In flink, this can be done with state.

Your use case is very similar to the fraud detection example from flink doc.

1
On

State is the enabler that makes this possible, but you also need to pay attention to the definition of "previous entry". If event ingestion order it all that matters, then this is easily implemented as a RichFilterFunction or a RichFlatMapFunction. But if "previous entry" needs to take per-event timestamps into account then this becomes more challenging because you need to first sort the stream by timestamp.