Is it possible to differentiate in a window among the removed items, when an item is updated or if it expired?

92 Views Asked by At

I'm novice in NESPER, and I have an application that works with a StateEvent (id string, ts long, State int). The application is based on the values supplied by ts(Timestamp) of the StateEvent. The values of the state property of StateEvent can be modified depending on some conditions, if a I have 3 application minutes (-2, - 1, 0) and there is at least one state = 80 in each minute, then all the states of minute -1, must be changed to another value: 80

I have defined

CREATE  SCHEMA  State_Evnt_Strm     (id string, ts long, state integer, tsMin_0 long, tsMin_1 long) ;
CREATE  SCHEMA  State_Strm          (id string, ts long, state integer, tsMin_0 long, tsMin_1 long) ;

CREATE window   wnd_3m_State.win:ext_timed(tsMin_0, 3 min) AS State38_a_Wnd ; 

UPDATE istream  wnd_3m_State 
    SET tsMin_0 = ts.roundFloor('min'), tsMin_1 = ts.roundFloor('min') - 60000;

ON State38_a_Strm  p 
    INSERT INTO wnd_3m_State (id , state, ts, tsMin_0 , tsMin_1 ) 
    SELECT  id
        ,   state
        ,   ts
        ,   tsMin_0
        ,   tsMin_1 ;

ON PATTERN  [every  a=wnd_3m_State (state = 80)
                ->  b=wnd_3m_State (state = 80, b.tsMin_1 = a.tsMin_0) 
                ->  c=wnd_3m_State (state = 80, c.tsMin_1 = b.tsMin_0  )]
    MERGE   wnd_3m_Driving  w 
    WHERE   b.id = w.id 
    WHEN    MATCHED     and tsMin_0 = b.tsMin_0 and state <> 80
    THEN    UPDATE      SET state   = 80


    SELECT  rstream
            id
        ,   state
        ,   ts
        ,   tsMin_0
    FROM    wnd_3m_State p
    ORDER BY tsMin_0, ts    ;

In the last clause I need to filter to select only the states that has expired in the window, not the modified by the merge (remove/insert), is this possible? I don't know if there is another way to obtain the expired items from a window than use the rstream, but it isn't enough.

I would appreciate if you could help me.

1

There are 1 best solutions below

1
On

You could set some value in the insert stream event itself at the time of merge by using a function that receives the event and sets some flag on it indicating that it was merged.