My requirement is to generate trigger based on 2 events(EVT_A & EVT_B independent of order). Here is expectation
1. EVT_A arrived. --> No action
2. EVT_B arrived --> Should Trigger
3. EVT_B arrived --> should Trigger since A was received previously (o/p should include A and current B)
4. EVT_A arrived --> should Trigger since B was received previously (o/p should include current A and last B)
5. EVT_A arrived --> should Trigger since B was received previously (o/p should include current A and last B)
I tried following but no success.
SELECT E.*
From MyEvents
MATCH_RECOGNIZE (
ORDER BY procTime
MEASURES ARRAY[
Event(A.id, A.name, A.date),
Event(B.id, B.name, B.date)
] AS Events
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A C* B)
DEFINE
A AS name in ('EVT_A', 'EVT_B'),
B AS name in ('EVT_A', 'EVT_B') AND B.name <> A.name,
C AS name not in ('EVT_A', 'EVT_B')
) AS E;
I also tried with "AFTER MATCH SKIP TO FIRST A". But it also therw exception. Any suggestions how can I achieve this with Flink SQL CEP or any other way in Flink.
This seems to be a case where a RichFlatMap or a ProcessFunction is the easiest way to go. Just need a bit of state:
And then the logic for processing each incoming event is something like this:
If you are looking for an introduction to working with Flink's managed state, there's a tutorial in the docs.