Siddhi App, 'Output Rate Limiting' clause misbehaviour in Joins

169 Views Asked by At

I am facing a non-expected behaviour when using the clause output every along with table join clause.

  • I have an basic app, with one input stream, and 2 tables, which store a different list of values. Then, there are also 2 queries,
  • The first query1 will join with table1, and when there is a match will output first every 5 sec.
  • Second query2 will do similarly, will join table2, and will output first value found every 5 sec.
  • The goal of this is, every 5 seconds, when there is a value in input stream which is contained into table 1, there will be a match, and if there is a value contained into table 2, there will be a different match, and both queries will keep silent until next 5 seconds block.

the app is the following

@App:name("delays_tables_join")

define stream input(value string);
define stream table_input(value string);
define table table1(value string);
define table table2(value string);

@sink(type='log')
define stream LogStream (value string);

-- fill table1
@info(name='insert table 1')
from table_input[value == '1']
insert into table1;

-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table2;

-- query input join with table 1, output once every 5 sec
@info(name='query1')
from input join table1 on input.value == table1.value
select input.value
output first every 5 sec
insert into LogStream;

-- query input join with table 2, output once every 5 sec
@info(name='query2')
from input join table2 on input.value == table2.value
select input.value
output first every 5 sec
insert into LogStream;
  • When this app is run,first its sent to table_input the values 1, and 2 to fill both tables

  • And then, it starts sending to the input stream repeatedly values: 1, 2, 1, 2, 1, 2...

  • It is expected to have in LogStream 2 values every 5 seconds, the first appearance of 1 value, and the first appearance of value 2.

  • But instead, just the first occurrence of value 1 appears all the time, but not the value 2

[2020-04-02_18-55-16_498] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846516098, data=[1], isExpired=false} 
[2020-04-02_18-55-21_508] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays_tables_join : LogStream : Event{timestamp=1585846521098, data=[1], isExpired=false} 

Please, note that, when there are no table joins involved, both queries work as expected. Example without joins:

@App:name("delays")
define stream Input(value string);

@sink(type='log')
define stream LogStream (value string);

@info(name='query1')
from Input[value == '1']
select value
output first every 5 sec
insert into LogStream;

@info(name='query2')
from Input[value == '2']
select value
output first every 5 sec
insert into LogStream;

this will produce the following output:

[2020-04-02_18-53-50_305] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430304, data=[1], isExpired=false} 
[2020-04-02_18-53-50_706] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846430305, data=[2], isExpired=false} 
[2020-04-02_18-53-55_312] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846438305, data=[1], isExpired=false} 
[2020-04-02_18-53-56_114] INFO {io.siddhi.core.stream.output.sink.LogSink} - delays : LogStream : Event{timestamp=1585846439305, data=[2], isExpired=false}

.

I was wondering if this behaviour is expected, or there is any error at all in the design of the application.

Many thanks!

1

There are 1 best solutions below

1
On

I was able to get the results as in the "without join" by fixing the "insert table 2" query by changing the table1 to table2 in insert into line

-- fill table2
@info(name='insert table 2')
from table_input[value == '2']
insert into table1;