I am facing a non-expected behaviour when using the clause output every
along with table join
clause.
- I have an basic app, with one
input
stream, and 2 tables, which store a different list of values. Then, there are also 2 queries, - The first
query1
will join withtable1
, and when there is a match willoutput first every 5 sec
. - Second
query2
will do similarly, will jointable2
, and will output first value found every 5 sec. - The goal of this is, every 5 seconds, when there is a value in
input
stream which is contained into table 1, there will be a match, and if there is a value contained into table 2, there will be a different match, and both queries will keep silent until next 5 seconds block.
the app is the following
@App:name("delays_tables_join")
define stream input(value string);
define stream table_input(value string);
define table table1(value string);
define table table2(value string);
@sink(type='log')
define stream LogStream (value string);
-- fill table1
@info(name='insert table 1')
from table_input[value == '1']
insert into table1;
-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table2;
-- query input join with table 1, output once every 5 sec
@info(name='query1')
from input join table1 on input.value == table1.value
select input.value
output first every 5 sec
insert into LogStream;
-- query input join with table 2, output once every 5 sec
@info(name='query2')
from input join table2 on input.value == table2.value
select input.value
output first every 5 sec
insert into LogStream;
When this app is run,first its sent to
table_input
the values1
, and2
to fill both tablesAnd then, it starts sending to the input stream repeatedly values:
1
,2
,1
,2
,1
,2
...It is expected to have in
LogStream
2 values every 5 seconds, the first appearance of1
value, and the first appearance of value2
.But instead, just the first occurrence of value
1
appears all the time, but not the value2
[2020-04-02_18-55-16_498] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846516098, data=[1], isExpired=false}
[2020-04-02_18-55-21_508] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846521098, data=[1], isExpired=false}
Please, note that, when there are no table joins involved, both queries work as expected. Example without joins:
@App:name("delays")
define stream Input(value string);
@sink(type='log')
define stream LogStream (value string);
@info(name='query1')
from Input[value == '1']
select value
output first every 5 sec
insert into LogStream;
@info(name='query2')
from Input[value == '2']
select value
output first every 5 sec
insert into LogStream;
this will produce the following output:
[2020-04-02_18-53-50_305] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430304, data=[1], isExpired=false}
[2020-04-02_18-53-50_706] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430305, data=[2], isExpired=false}
[2020-04-02_18-53-55_312] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846438305, data=[1], isExpired=false}
[2020-04-02_18-53-56_114] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846439305, data=[2], isExpired=false}
.
I was wondering if this behaviour is expected, or there is any error at all in the design of the application.
Many thanks!
I was able to get the results as in the "without join" by fixing the "insert table 2" query by changing the table1 to table2 in insert into line