Apply anomaly detection on Flink sliding windows

96 Views Asked by At

I am new to Flink, so I hope what I am saying makes sense. I would like to apply sliding windows to a DataStream, and then for each of those Windows to perform anomaly detection, using FlinkML or maybe FlinkCEP (in fact I want to use both). My question is, which function should I use after I have created the sliding windows.

So far I am trying to achieve this using the apply method, but I am not sure if it makes sense. To my understanding when the apply function is performed, then I will have all the elements within the window available.

1

There are 1 best solutions below

1
On

I don't think Flink ML currently provides off-the-shelf anomaly detection algorithm that uses a sliding window like you mentioned.

Suppose you have a function that takes a collect of elements as input and determines whether there is an anomaly, then you can write a streaming job that applies this function to sliding windows of an input stream like below:

DataStream<Integer> source =
        env.fromElements(1, 2, 3);
DataStream<String> alerts =
        source.keyBy(i -> i %2)
                .window(SlidingEventTimeWindows.of(
                        Time.of(1000, TimeUnit.MILLISECONDS),
                        Time.of(100, TimeUnit.MILLISECONDS)))
                .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                    @Override
                    public void process(
                            Integer key,
                            Context context,
                            Iterable<Integer> elements,
                            Collector<String> out) throws Exception {
                        if (isAbnormal(elements)) {
                            out.collect("Found a sequence of abnormal elements");
                        }
                    }
                });