Azure Stream Analytics: Compact sensor data per hour?

135 Views Asked by At

How can I collect sensor events into hourly documents with an array of a subset of fields of the original message:

The incoming events have the following format:

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg", "time": "2017-09-05T22:00:14.9410000Z", "value": 1234.56}

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg", "time": "2017-09-05T22:00:19.5410000Z", "value": 1334.76}

...

I'd like to get the following output for each sensor each hour:

{"plantId": "Plant A", "machineId" : "M001", "sensorId": "S001", "unit": "kg",

  "from" : "2017-09-05T22:00:14.9410000Z", "to" : "2017-09-05T22:59:55.5410000Z",

  "datat": [

    {"time": "2017-09-05T22:01:14.9410000Z", "value": 1234.56},

    {"time": "2017-09-05T22:01:19.5410000Z", "value": 1334.76},

    ....

  ]

}

I create the following query:

SELECT  PlantId, MachineId, SensorId, Unit, 
        MIN(Time) AS From, MAX(Time) AS To, 
        Collect() AS Data
INTO CosmosDBOutput
FROM SensorsInput TIMESTAMP BY CAST(time as datetime)
GROUP BY PlantId, MachineId, SensorId, Unit, TumblingWindow(hour,1)

The problem is that collect returns the complete array of all origin events. But I'd like to have just the time and the value fields in it.

How can I reduce the Collect() result to this fields?

1

There are 1 best solutions below

1
On BEST ANSWER

According to your description, I suggest you could consider using JavaScript user-defined functions.

You could define a custom function to delete the useless value.

More details, you could refer to below steps:

1.Create a UDF:

enter image description here

2.Add below codes to the function

// Sample UDF which returns sum of two values.
function main(InputJSON) {
     for (i = 0; i < InputJSON.length; i++) {
       delete InputJSON[i].plantId;
       delete InputJSON[i].machineId;
       delete InputJSON[i].sensorId;
       delete InputJSON[i].unit;
    }
     return InputJSON;
}

3.Change the query:

Notice: replace the UDF.remove as your UDF name.(UDF.yourUDFname)

SELECT
    PlantId, MachineId, SensorId, Unit,UDF.remove(Collect()) AS Data,min(time) as fromdate,max(time) as todate
INTO
    [YourOutputAlias]
FROM
    [YourInputAlias] TIMESTAMP BY time
 GROUP BY PlantId, MachineId, SensorId, Unit, TumblingWindow(hour,1)

Result:

enter image description here