Sql Component :Consume multiple rows and mark them all as processed using onConsume

1.4k Views Asked by At

I configured camel sql component to read data from from database table . I have "onConsume" parameter working when i read one row at a time , but doesn't work when i try to read multiple rows at a time using "maxMessagesPerPoll". Here is what i tried ...

Working : When i read one row at a time and update the row using onConsume .

My consumer endpoint uri looks like :

sql:select * from REPORT where IS_VIOLATED != 'N' and TYPE = 'Provisioning'?consumer.delay=1000&consumer.onConsume=update REPORT set IS_VIOLATED = 'N' where REPORT_ID =:#REPORT_ID

Not working : When I configured camel's sql component to read configurable rows(using "maxMessagesPerPoll") . It reads multiple rows at a time but onConsume doesn't seem to work . I tried to tell camel to use IN operator and setting header value(REPORT_ID) with a array of values for IN clause.

My consumer endpoint uri now looks like :

sql:select * from REPORT where IS_VIOLATED != 'N' and TYPE = 'Provisioning'?consumer.delay=1000&maxMessagesPerPoll=3&consumer.useIterator=false&consumer.onConsume=update REPORT set IS_VIOLATED = 'N' where REPORT_ID in(:#REPORT_ID)

I might be doing something wrong here. I did enough searching on this already and found related post1, post2 . But it doesn't put me on correct path.

I need to be able to mark all the consumed rows to IS_VIOLATED = 'N' .

Thanks for your help.

2

There are 2 best solutions below

0
On

I noticed that you set consumer.useIterator=false, and the doc says:

If true each row returned when polling will be processed individually. If false the entire java.util.List of data is set as the IN body.

So I think that because of this option, the :#REPORT_ID is no more understood, since it would be from the entire list and no more from each row.

Maybe removing this option would already be enough.

I also didn't understand why you changed the where clause from where REPORT_ID =:#REPORT_ID to where REPORT_ID in(:#REPORT_ID).

0
On

By carefully looking at the apache sql component doc :

I tried implementing custom processing stratergy, using attribute "processingStrategy"`.

public class ReportProcessingStratergy implements SqlProcessingStrategy {
@Override
public int commit(DefaultSqlEndpoint defaultSqlEndpoint, Exchange exchange, Object o, JdbcTemplate jdbcTemplate, String s) throws Exception {
    s = s.replace("?","5066834,5066835,5066832");
    return jdbcTemplate.update(s);
}

@Override
public int commitBatchComplete(DefaultSqlEndpoint defaultSqlEndpoint, JdbcTemplate jdbcTemplate, String s) throws Exception {
    return 0;
}

}

configure spring bean :

  <bean class="go.ga.ns.reconc.sl.ReportProcessingStratergy" id="reportProcessingStratergy">

now my sql consumer endpoint uri looks like :

sql:select * from REPORT where IS_VIOLATED != 'N' and TYPE = 'Provisioning'?consumer.delay=1000&maxMessagesPerPoll=3&consumer.useIterator=false&&processingStrategy=#reportProcessingStratergy&consumer.onConsume=update REPORT set IS_VIOLATED = 'N' where REPORT_ID in(?)

note :processingStrategy=#reportProcessingStratergy(# has significance as explained here, it did not work with out it)