I am trying to use a single stream, process the incoming json format and write into different streams based on an attribute in the event. For example if the input stream consists of something like this:

{ "event_type" : "temperature",
  "json" : {    
            "type": "Temperature",
            "DeviceID":"xyz",
            "temperature": "32",
            "timestamp" :  "2019-03-19T12:37:43.356119Z"
            }
 }

Another event looks like this:

{ "event_type" : "location",
  "json" : {    
        "type": "GPS",
          "DeviceID":"xyz",
         "location": {"coordinates": [-73.856077, 40.848447]},
         "timestamp" :  "2019-09-22T00:00:00+05:30"
           }
 }

Both the events are being pushed to a single http endpoint (this is a limitation that I am facing)

How can I use a single http source stream, process these events and if the event_type is temperature insert into a temperature_collection in mongo db and if the event_type is location insert into a location_collection in mongo db?

  1. Is it possible to do this with a single stream?

  2. If no, how can I avoid writing multiple endpoints, one for every event type?

1

There are 1 best solutions below

0
On

Yes it is possible to define only a single stream and route each flow with a Siddhi filters,

@source(type='http' , receiver.url='http://localhost:8000/SensorStream', 
    @map(type='json', fail.on.missing.attribute='false', 
        @attributes(eventType='$.event_type', type='$.json.type',deviceID='$.json.DeviceID', temperature='$.json.temperature', location='$.json.location', timestamp='$.json.timestamp' ) ) ) 
define stream SensorStream(eventType string, type string, deviceID string, temperature string, location string, timestamp string);

from SensorStream[eventType=='temperature']
select deviceID, temperature, timestamp  
insert into TemperatureStream;

from SensorStream[eventType=='location']
select deviceID, location, timestamp  
insert into LocationStream;

As seen in the above example, source map property 'fail.on.missing.attribute', is used to ensure that the different formats can be mapped to a single stream along with custom mapping. After the events arrived at the endpoint the flow then divides based on the value of an attribute using filters.