Categorize or GroupBy datastream data and process with CEP separately

272 Views Asked by At

Assume I have a datastream

x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6

How do I put x,y,z in their own bucket and apply my CEP rule on it.

x:1, x:7,x: 2 
y:2, y:-1, y:3 , y:6
z:3, z:0 , z:3, z:2

Or to put it in other way. How do I split the stream in to these categories(one stream for each x,y,z). I would get 3 sub streams which has their own CEP processing.

The challenge here is , the x,y,z are not pre-defined.So I cannot pre-create streams and assign using an if or switch statement.

Edit: The pattern would be like ," if x value is between 0 - 8 for the last 10 minutes

1

There are 1 best solutions below

1
On

This is done by "keying" the stream on the category attribute.

If you have a DataStream[(String, Int)] this looks a follows:

val yourStream: DataStream[(String, Int)] = ???
val yourPattern: Pattern = ???

// key by String attribute
val keyedStream = yourStream.keyBy(_._1) 
// apply pattern on keyed stream 
val patternStream: PatternStream = CEP.pattern(keyedStream, yourPattern)

The pattern will be evaluated for each distinct value of the keyed attribute.