Camel EIP to filter duplicates

3.3k Views Asked by At

I have a Camel route that dequeues a message off a queue, sends it to a bean for processing, then enqueues the message back onto a different queue.

I am trying to eliminate "duplicate messages" on the 2nd queue. Does Camel have any endpoints, processors, EIPs, etc. that I could configure to dedupe message en route, before they get sent to the 2nd queue?

Example:

<route id="myRoute">
    <from uri="{{queue-1-uri}}" />
    <to uri="bean:myBean?method=process" />
    <!-- How to dedupe right here??? -->
    <to uri="{{queue-2-uri}}" />
</route>

Update: perhaps something like this:

<route id="myRoute">
    <from uri="{{queue-1-uri}}" />
    <to uri="bean:myBean?method=process" />
    <filter>
        <method>what goes here???</method>
        <to uri="{{queue-2-uri}}" />
    </filter>
</route>

Per Ralf's suggestion, I could then reference a bean inside <method></method> that then used a cache to keep messages in memory.

Say this new bean was called FilterBean and it had a dedupe() method on it: how do I wire it up in the Spring XML, and what classes/interfaces does the bean need to implement to be called from inside the route?

1

There are 1 best solutions below

0
On

I think you are looking for the Idempotent Consumer that Camel provides. There are different approaches depending on your needs like Memory, JDBC, Hazelcast... I am not really sure if it will work for you since you are using a bean after the consumer but it worth to give it a try. Simple example from Camel website is as follows:

<!-- repository for the idempotent consumer -->
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <idempotentConsumer messageIdRepositoryRef="myRepo">
            <!-- use the messageId header as key for identifying duplicate messages -->
            <header>messageId</header>
            <!-- if not a duplicate send it to this mock endpoint -->
            <to uri="mock:result"/>
        </idempotentConsumer>
    </route>
</camelContext>

You can find more info here: Idempotent Consumer