How to convert to dynamic type/ apply multiple functions on same 'pack' in KQL/Kusto

1.3k Views Asked by At

I am absolutely in love with ADX time series capabilities; having worked tons on sensor data with Python. Below are the requirements for my case:

  1. Handle Sensor data tags at different frequencies -- bring them to all to 1 sec frequency (if in milliseconds, aggregate over a 1sec interval)
  2. Convert stacked data to unstacked data.
  3. Join with another dataset which has multiple "string-labels" by timestamp, after unstack.
  4. Do linear interpolation on some columns, and forward fill in others (around 10-12 in all).

I think with below query I have gotten the first three done; but unable to use series_fill_linear directly on column. The docs say this function requires a dynamic type as input. The error message is helpful: series_fill_linear(): argument #1 was not of an expected data type: dynamic

Is it possible to apply series_fill_linear where I'm already using pack instead of using pack again. How can I apply this function selectively by Tag; and make my overall query more readable? It's important to note that only sensor_data table requires both series_fill_linear and series_fill_forward; label_data only requires series_fill_forward.

List item

sensor_data
    | where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00) 
    | where device_number =='PRESSURE_599' 
    | where tag_name in ("tag1", "tag2", "tag3",  "tag4") 
    | make-series agg_value = avg(value) default = double(null) on timestamp in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s) by tag_name
    | extend series_fill_linear(agg_value, double(null), false) //EDIT
    | mv-expand timestamp to typeof(datetime), agg_value to typeof(double) 
    | summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
    | evaluate bag_unpack(b)
|join kind = leftouter (label_data
    | where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01) 
    | where device_number =='PRESSURE_599'
    | where tag != "PRESSURE_599_label_Raw" 
    | summarize x = make_bag(pack(tag, value)) by timestamp
    | evaluate bag_unpack(x)) on timestamp
    | project timestamp, 
              MY_LINEAR_COL_1 = series_fill_linear(tag1, double(null), false),
              MY_LINEAR_COL_2 = series_fill_forward(tag2),
              MY_LABEL_1 = series_fill_forward(PRESSURE_599_label_level1),
              MY_LABEL_2 = series_fill_forward(PRESSURE_599_label_level2)

EDIT: I ended up using extend with case to handle different cases of interpolation.

// let forward_tags = dynamic({"tags": ["tag2","tag4"]}); unable to use this in query as "forward_tags.tags"

sensor_data
    | where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00)
    | where device_number = "PRESSURE_599"
    | where tag_name in ("tag1", "tag2", "tag3", "tag4") // use a variable here instead?
    | make-series agg_value = avg(value) 
                              default = double(null) 
                              on timestamp
                              in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s)
                              by tag_name
    | extend agg_value = case (tag_name in ("tag2", "tag3"), // use a variable here instead?
                                series_fill_forward(agg_value, double(null)),
                                series_fill_linear(agg_value, double(null), false)
                                )
    | mv-expand timestamp to typeof(datetime), agg_value to typeof(double) 
    | summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
    | evaluate bag_unpack(b)
| join kind = leftouter (  
  label_data // don't want to use make-series here, will be unecessary data generation since already in 'ss' format.
    | where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01)
    | where tag != "PRESSURE_599_label_Raw" 
    | summarize x = make_bag(pack(tag, value)) by timestamp
    | evaluate bag_unpack(x)
    ) 
on timestamp

I was wondering if it is possible in KQL to pass a list of strings inside a query/fxn to use as shown below. I have commented where I think a list of strings could be passed to make the code more readable.

Now, I just need to fill_forward the label columns (MY_LABEL_1, MY_LABEL_2); which are a result of the below query. I would prefer the code is added on to the main query, and the final result is a table with all columns; Here is a sample table based on my case's result.

datatable (timestamp:datetime, tag1:double, tag2:double, tag3:double, tag4:double, MY_LABEL_1: string, MY_LABEL_2: string)
    [
     datetime(2020-11-24T00:01:00Z), 1, 3, 6, 9, "x", "foo",
     datetime(2020-11-24T00:01:01Z), 1, 3, 6, 9, "", "",
     datetime(2020-11-24T00:01:02Z), 1, 3, 6, 9,"", "",
     datetime(2020-11-24T00:01:03Z), 1, 3, 6, 9,"y", "bar",
     datetime(2020-11-24T00:01:04Z), 1, 3, 6, 9,"", "",
     datetime(2020-11-24T00:01:05Z), 1, 3, 6, 9,"", "",
     ]
2

There are 2 best solutions below

2
On BEST ANSWER

Series functions in ADX only work on dynamic arrays. You can apply a selective fill function using case() function, by replacing this line:

| extend series_fill_linear(agg_value, double(null), false) //EDIT

With something like the following:

| extend agg_value = case(
        tag_name == "tag1", series_fill_linear(agg_value, double(null), false),
        tag_name == "tag2", series_fill_forward(agg_value),
        series_fill_forward(agg_value)
  )

Edit:
Here is an example of string column fill-forward workaround:

let T = datatable ( Timestamp: datetime, Employee: string ) 
[   datetime(2020-01-01), "Bob",
datetime(2021-01-02), "",
datetime(2021-01-03), "Alice",
datetime(2021-01-04), "",
datetime(2021-01-05), "",
datetime(2021-01-06), "Alan",
datetime(2021-01-07), "",
datetime(2021-01-08), ""  ]
| sort by Timestamp asc;
let employeeLookup = toscalar(T | where isnotempty(Employee) | summarize make_list(Employee));
T
| extend idx = row_cumsum(tolong(isnotempty(Employee)))
| extend EmployeeFilled = employeeLookup[idx - 1]
| project-away idx
Timestamp Employee EmployeeFilled
2021-01-01 00:00:00.0000000 Bob Bob
2021-01-02 00:00:00.0000000 Bob
2021-01-03 00:00:00.0000000 Alice Alice
2021-01-04 00:00:00.0000000 Alice
2021-01-05 00:00:00.0000000 Alice
2021-01-06 00:00:00.0000000 Alan Alan
2021-01-07 00:00:00.0000000 Alan
2021-01-08 00:00:00.0000000 Alan
1
On

Regarding your requirement to convert the time series in many frequencies to a common one, have a look at series_downsample_fl() function library