Kapacitor Lambda Mean

827 Views Asked by At

I am trying to generate baseline with Kapacitor batch|query by querying the same interval from InfluxDB 1, 2, 3 and 4 weeks ago, then shifting it forward and joining together like this:

var w1 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(1w).period(period).every(1m).align().groupBy(time(1m))
    |shift(1w)

var w2 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(2w).period(period).every(1m).align().groupBy(time(1m))
    |shift(2w)

var w3 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(3w).period(period).every(1m).align().groupBy(time(1m))
    |shift(3w)

var w4 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(4w).period(period).every(1m).align().groupBy(time(1m))
    |shift(4w)

var bj = w1
    |join(w2, w3, w3)
        .as('w1', 'w2', 'w3', 'w4')
        .fill('null')

var b = bj
    |eval(lambda: (""w1.mean"" + ""w2.mean"" + ""w3.mean"" + ""w4.mean"") / float(4.0))
        .as('avg')

I am using Full Outer Join, since some weeks may be missing a value, in which case I would calculate baseline as mean of 3 present values.

However it appears that lambda doesn't support Mean() or any such mathematical functions. It also seem to not support checks for null values.

Is there a way to calculate baseline like this?

Also once the baseline is calculated, how is it possible to keep it cached so that incoming stream data may be checked against baseline?

Any help is appreciated! Thank you

3

There are 3 best solutions below

0
On BEST ANSWER

I ended up creating a UDF using C# .NET Core running as a process side by side, querying InfluxDB, doing math and caching the results.

0
On

"since some weeks may be missing a value" - join would wait forever for this values, not emitting other corresponding batches and cause a memory leak.

the |barrier() node may help with leaks, but still you'll be getting eval errors trying to access missing point attributes.

You want to split this in multiply scripts, e.g. one that calculates all 4 periods, adds a some tag to each, like

|default().tag('stream', 'w1')

and just sends them to

|kapacitorLoopback()

And the second script that listens to your loopback stream, |window() all arrived points without grouping and calculates |mean("mean") no matter how many periods it actually got.

0
On

First, try to use offset on the batch var instead of shift.

offset will take values from x previous min, hours, days...

The shift node should be used on the join process, ex:

previous
    |shift(1w)
    |join(current)
    ......

Here you have an example: https://github.com/influxdata/kapacitor/issues/746

Regarding joining 4 different streams, with different times, and because of my previous comment, i guess it wouldn't work...Maybe with union instead of join node works, but not sure!

You can always have 3 ticks, checking current to past week, 2 weeks and so on...