Reactive Extension : 3 Consecutive occurance of a Record

98 Views Asked by At

I have a hot observable -

Class TableRecord

{
    string propertyA;
    DateTime dt;
    Dictionary<string,int> checkThreshHoldValues;
}  

I need to check for each value present in the dictionary, if the 3 consecutive values are exceeding the threshold or not. If yes, I need to generate a new event for that property. (Here each reacord will come at a different of 15 minutes ie 4 records in 1 hour.

Class GeneratedRecord
{
  string propertyA;
  DateTime lastTimeObserved;
  string dictionaryPropertyName;
  int lastValue
}

I know I have to use groupByUntil with Buffer, but somehow I am unable to think , how to split the dictionary into individual parts with still keeping the condition Any help will be appreciated.

Example : Lets say I have PropertyA value is in {"TownA","TownB","TownC"} Lets say my properties in dictionary are {"Pollution","Temperature","Pressure"}

Lets say, I am recv values per 15 minutes, so my data is say -

{"TownA" ,"6/14/2015 10:00",{ {"Pollution",61},{"Temperature",48},{"Pressure",40}}}

{"TownA" ,"6/14/2015 10:15",{ {"Pollution",63},{"Temperature",63},{"Pressure",40}}}

{"TownA" ,"6/14/2015 10:30",{ {"Pollution",49},{"Temperature",64},{"Pressure",40}}}
{"TownA" ,"6/14/2015 10:45",{ {"Pollution",70},{"Temperature",65},{"Pressure",40}}}

Say for Threshhold value is 60. So with above example, only Temperature satisfies the condition as it has value of {63,64,65 } . For pollution only 2 consecutive values were above 60

regards

1

There are 1 best solutions below

0
On

You should be able to do this with overlapping windows but I couldn't work out the syntax to start a new window every time an element is received and close the window when it contains three items. But here is a conceptual solution based on Observable.Create and maintaining a queue. In this case I use Interval and a simple threshold (>3):

Observable.Create<string>(observer =>
{
    var queue = new Queue<long>(3);



    return Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(value =>
    {
        if (queue.Count == 3)
        {
            queue.Dequeue();
        }

        queue.Enqueue(value);

        if (queue.Count == 3)
        {
            var values = queue.ToArray();
            if (values.All(v => v > 3))
            {
                observer.OnNext("All above 3. " + values[0] + " " + values[1] + " " + values[2]);
            }
        }
    });
})