SplitStream for dynamic output key (Select)

860 Views Asked by At

This is my code.

    SplitStream<MonitoringEvent> splitStream =  inputStream.split(new OutputSelector<MonitoringEvent>() {

    @Override
    public Iterable<String> select(MonitoringEvent me) {

        List<String> ml = new ArrayList<String>();              
        ml.add(me.getEventType());                              
        return ml;
}

I have stream of Monitoring Events coming in random order temp : 80, pressure : 70 , humidity :80, temp:30...

With the above code, am splitting the stream , eventType wise i.e temperatureStream, pressureStream.

The problem is , if I know the eventType, i can select it from the splitStream like

splitStream.select('temperatureStream')

but the eventType is dynamic and not pre-defined.

How will I apply CEP for this dynamic stream. The CEP would be like, if the

temperate is > 90 for past 10 minutes ...

pressure is > 90 for past 10 minutes ...
1

There are 1 best solutions below

2
On

Correct me if i'm wrong, but i think it isn't possible to do a dynamic lookup on select due flink's parallism. Your program gets translated into parallel instructions for flinks taskmanagers and the jobmanager coordinate these actions. Without overall knowlegde about your abstract syntax tree no parallism could be applied at all... Maybe you could find some in common attribute that all messages share and differ from there