JMS listener processes message twice

569 Views Asked by At

we migrated a websphere j2ee app to spring boot. Everything looked great but now we found out that the message listeners are sometimes processing some messages twice.

It looks like to me it happens when one message is been processed and not yet commited, an other concurrent consumer can get the same message and also process it. Looks like the message broker doesn't hold it back, doesn't reserves it for consumer 1.

import bitronix.tm.resource.jms.PoolingConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;

import javax.jms.ConnectionFactory;
import javax.jms.Session;
import java.util.Properties;

@Configuration
@EnableJms
@EnableCaching(proxyTargetClass = true)
public class JmsConfig {

    @Bean
    ConnectionFactory jmsXAConnectionFactory() {
        PoolingConnectionFactory connectionFactory = new PoolingConnectionFactory();
        connectionFactory.setClassName("com.ibm.mq.jms.MQXAQueueConnectionFactory");
        connectionFactory.setUniqueName("mq-xa-" + appName);
        connectionFactory.setAllowLocalTransactions(true);
        connectionFactory.setTestConnections(false);
        connectionFactory.setUser(user);
        connectionFactory.setPassword(password);
        connectionFactory.setMaxIdleTime(1800);
        connectionFactory.setMinPoolSize(1);
        connectionFactory.setMaxPoolSize(25);
        connectionFactory.setAcquisitionTimeout(60);
        connectionFactory.setAutomaticEnlistingEnabled(true);
        connectionFactory.setDeferConnectionRelease(true);
        connectionFactory.setShareTransactionConnections(false);
        
        Properties driverProperties = connectionFactory.getDriverProperties();
        driverProperties.setProperty("queueManager", queueManager);
        driverProperties.setProperty("hostName", connName);
        driverProperties.setProperty("port", "1414");
        driverProperties.setProperty("channel", channel);
        driverProperties.setProperty("transportType", "1"); 
        driverProperties.setProperty("messageRetention", "1"); 
        
        return connectionFactory;
    }
    
    @Primary 
    @Bean
    public BitronixTransactionManager btronixTransactionManager() throws SystemException {
        TransactionManagerServices.getConfiguration().setServerId("bitronix-tm-" + appName);
        TransactionManagerServices.getConfiguration().setLogPart1Filename(jtaLogDir + "/btm1.tlog");
        TransactionManagerServices.getConfiguration().setLogPart2Filename(jtaLogDir + "/btm2.tlog");
        TransactionManagerServices.getTransactionManager().setTransactionTimeout(180);
        return TransactionManagerServices.getTransactionManager();
    }

    @Bean
    public PlatformTransactionManager platformTransactionManager(
            BitronixTransactionManager transactionManager, UserTransaction userTransaction) {
        return new JtaTransactionManager(userTransaction, transactionManager);
    }

    @Bean("wgstatusML")
    public DefaultMessageListenerContainer wagenstatusMessageListenerContainer(
            ConnectionFactory jmsXAConnectionFactory,
            PlatformTransactionManager jtaTransactionManager,
            @Qualifier("wagenstatusBean") WagenstatusBean wagenstatusBean) {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(jmsXAConnectionFactory);
        container.setTransactionManager(jtaTransactionManager);
        container.setDestinationName(WAGENSTATUS_QUEUE);
        container.setMessageListener(wagenstatusBean);
        container.setAutoStartup(false);
        container.setConcurrentConsumers(2);
        container.setClientId("wgstatListener");
        container.setSessionTransacted(false);
        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        return container;
    }
}

@Service("wagenstatusBean")
@Scope(SCOPE_PROTOTYPE)
public class WagenstatusBean extends AbstractMDB {

    @Transactional(propagation = REQUIRED)
    public void onMessage(javax.jms.Message msg) {
        String localMessageText = null;
        try {
            try {
                localMessageText = ((TextMessage) msg).getText();
            } catch (JMSException e) {
            }

            // here goes the actual call to the impl
            String errmsg = null;
            readableMessageID = null;
            try {
                verarbeiteMeldung(msg);
            } catch (InvalidMessageException ime) {
                errmsg = ime.getMessage();
            }

            if (sendMessageToErrorQueue) {
                // generate business logging entry
                try {
                    logBusinessData(localMessageText, BusinessLogger.STATUS_ERROR);
                } catch (Exception e) {
                    LOGGER.error("", e);
                }

                if (localMessageText != null) {
                    localMessageText = this.addErrorMessageToXML(localMessageText, errmsg);
                }

                DispatcherServiceLocator.getDispatcherBean().sendToDestination(
                        QueueNames.WAGENSTATUS_ERROR_QUEUE, localMessageText);
            }
        } catch (ConsistencyException ex) {
            // ConsistencyException used internally in the EJBEnv
            // framework/template needs to be catched and translated into an EJBException
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();;

            // generate business logging entry
            try {
                // UE03772, RfC 169: BUSINESS LOGGING POINT
                logBusinessData(localMessageText, BusinessLogger.STATUS_ERROR);
            } catch (Exception e) {
                LOGGER.error("", e);
            }
            LOGGER.error("Caught a ConsistencyException in WagenStatus-onMessage", ex);
        } catch (RuntimeException ex) {
            // this catching is done for logging purpouse only.
            LOGGER.error("Caught a RuntimeException in WagenStatus-onMessage", ex);

            // generate business logging entry
            try {
                logBusinessData(localMessageText, BusinessLogger.STATUS_ERROR);
            } catch (Exception e) {
                LOGGER.error("", e);
            }
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();;
        }

    } 
0

There are 0 best solutions below