I'm novice in NESPER, and I have an application that works with a StateEvent (id string, ts long, State int). The application is based on the values supplied by ts(Timestamp) of the StateEvent. The values of the state property of StateEvent can be modified depending on some conditions, if a I have 3 application minutes (-2, - 1, 0) and there is at least one state = 80 in each minute, then all the states of minute -1, must be changed to another value: 80
I have defined
CREATE SCHEMA State_Evnt_Strm (id string, ts long, state integer, tsMin_0 long, tsMin_1 long) ;
CREATE SCHEMA State_Strm (id string, ts long, state integer, tsMin_0 long, tsMin_1 long) ;
CREATE window wnd_3m_State.win:ext_timed(tsMin_0, 3 min) AS State38_a_Wnd ;
UPDATE istream wnd_3m_State
SET tsMin_0 = ts.roundFloor('min'), tsMin_1 = ts.roundFloor('min') - 60000;
ON State38_a_Strm p
INSERT INTO wnd_3m_State (id , state, ts, tsMin_0 , tsMin_1 )
SELECT id
, state
, ts
, tsMin_0
, tsMin_1 ;
ON PATTERN [every a=wnd_3m_State (state = 80)
-> b=wnd_3m_State (state = 80, b.tsMin_1 = a.tsMin_0)
-> c=wnd_3m_State (state = 80, c.tsMin_1 = b.tsMin_0 )]
MERGE wnd_3m_Driving w
WHERE b.id = w.id
WHEN MATCHED and tsMin_0 = b.tsMin_0 and state <> 80
THEN UPDATE SET state = 80
SELECT rstream
id
, state
, ts
, tsMin_0
FROM wnd_3m_State p
ORDER BY tsMin_0, ts ;
In the last clause I need to filter to select only the states that has expired in the window, not the modified by the merge (remove/insert), is this possible? I don't know if there is another way to obtain the expired items from a window than use the rstream, but it isn't enough.
I would appreciate if you could help me.
You could set some value in the insert stream event itself at the time of merge by using a function that receives the event and sets some flag on it indicating that it was merged.