Flink CEP all previous events cleared from the state on a match

273 Views Asked by At

I have a particular case in flink CEP where all the events are discarded on a single match. For example let us take an example input of Apple, And, All, Bulb, Bee, Bell. The condition is word starting with A followed by word starting with B. But the match returns only the first match and it is skipping the remaining matches so only Apple Bulb is returned, others are discarded.

1

There are 1 best solutions below

3
On

You should experiment with the various options for AfterMatchSkipStrategy and the different ways of combining patterns. For example, if you use AfterMatchSkipStrategy.noSkip() in combination with followedByAny(), then all 9 matches are returned.

DataStream<String> stringInputStream = environment.fromElements(
    "apple", "and", "all", "bulb", "bee", "bell");

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
        .where(new SimpleCondition<String>() {

            @Override
            public boolean filter(String s) throws Exception {
                return (s.charAt(0) == 'a');
            }
        }).followedByAny("next")
        .where(new SimpleCondition<String>() {

            @Override
            public boolean filter(String s) throws Exception {
                return (s.charAt(0) == 'b');
            }
        });

PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
DataStream<String> result = patternStream.process(
        new PatternProcessFunction<String, String>() {

    @Override
    public void processMatch(
            Map<String, List<String>> map,
            Context context,
            Collector<String> out) throws Exception {
        out.collect(map.toString());
    }
});
result.print();
{start=[apple], next=[bulb]}
{start=[and], next=[bulb]}
{start=[all], next=[bulb]}
{start=[apple], next=[bee]}
{start=[and], next=[bee]}
{start=[all], next=[bee]}
{start=[apple], next=[bell]}
{start=[and], next=[bell]}
{start=[all], next=[bell]}

However, you need to be careful. Setting up patterns that require storing events indefinitely will lead to poor performance and ultimately to job failures. You want to constrain the matches as much as possible.