JmsTemplate ad hoc consumer is starving

477 Views Asked by At

I'm trying to create an ad hoc consumer for my JMS messages (I use ActiveMQ)

it looks like this:

 jmsTemplate.browse(
                          q.getName(),
                          new BrowserCallback<Integer>() {

                          @Override
                          public Integer doInJms(Session session, QueueBrowser browser) throws JMSException {

                              Queue destination = session.createQueue(q.getName());
                              Enumeration<?> enum1 = browser.getEnumeration();

                              while (enum1.hasMoreElements()) {
                              ActiveMQObjectMessage msg = (ActiveMQObjectMessage) enum1.nextElement();
                              MessageConsumer consumer = session.createConsumer(destination);
                              Message m = consumer.receiveNoWait();
                              handle(m);
                              m.acknowledge();

This ad hoc consumer should handle all the failed to consumed messages. problem is my original 2-3 consumers that are defined in the spring-messaging.xml are constantly trying to handle the failed events and retries via re-delivery configuration (re delivery delay is set to 3 seconds, and the amount of redeliveries is unlimited)

This consumer should handle these messages but actually is starving (wont recieve these messages at all and therfore

          Message m = consumer.receiveNoWait();

returns null all the time.

Here are my beans:

    <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="queue" value="*" />
    <property name="initialRedeliveryDelay" value="0" />
    <property name="redeliveryDelay" value="2000" />        
    <property name="maximumRedeliveries" value="-1" />
</bean>

    <!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="redeliveryJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="redeliveryCachingConnectionFactory" />
    <property name="messageConverter" ref="eventConverter" />
    <property name="sessionTransacted" value="true" />
</bean>

p.s when i change the configuration of p:sessionCacheSize to 1:

    <bean id="redeliveryCachingConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory"
    p:targetConnectionFactory-ref="redeliveryConnectionFactory"
    p:sessionCacheSize="1" />

It works, but I would like to use the cache.

Any ideas?

1

There are 1 best solutions below

0
On

You are assuming that queue browser is returning consumable messages. Browsed messages are like a snapshot in time of the moment the browse was done. It will include any messages assigned to a prefetch or by the time you get to it again, it may have been consumed. Additionally you are not waiting around for a message to be assigned, so you are constantly getting in back of the line for messages.

I'd suggest allowing the messages to be moved to a DLQ then allowing this Adhoc consumer to consume from it. Otherwise you are competing with your other consumers for messages.

If you are really dead set against using a DLQ, you can use JMS selectors to consume based on the JMSRedelivered header being true, with a longer wait time. You will still be competing with your other consumers, but would get a message at some point.