ActiveMQ not delivering message from KahaDB Filestore

153 Views Asked by At

Our Spring 5 application is configured to use ActiveMQ and to persist the messages. We decided to go with KahaDB file storage.

To test message persistence, I commented out the MessageListener bean and sent a message to a queue (via JmsTemplate) and verified the message gets written to kahadb data log files. After un-commenting the MessageListener bean and restarting the broker, the message does not get delivered to the listener. I am unable to figure out why the message is not delivered upon broker restart, any help will be greatly appreciated.

Below is the code that adds KahaDb persistence to ActiveMQ configuration:

   @Bean(initMethod = "start", destroyMethod = "stop")
   public BrokerService brokerServiceConfig() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.addConnector("vm://localhost");
        brokerService.setBrokerName("order-broker");
        PersistenceAdapter kahaDbAdapter = new KahaDBPersistenceAdapter();
        File kahaDir = new File("/home/test");
        kahaDbAdapter.setDirectory(kahaDir);
        brokerService.setPersistenceAdapter(kahaDbAdapter);
        brokerService.setPersistent(true);
   }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setExplicitQosEnabled(true);
        template.setDeliveryMode(DeliveryMode.PERSISTENT);
        return template;
    }
    
    @Bean
    @DependsOn({"brokerService"})
    private static ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL("vm://my-broker?create=false");
        
        List<String> trustedPackageList = new ArrayList<>(activeMQConnectionFactory.getTrustedPackages());
        trustedPackageList.add("com.mypackages");
        activeMQConnectionFactory.setTrustedPackages(trustedPackageList);
        CachingConnectionFactory connFactory = new CachingConnectionFactory();
        activeMQConnectionFactory.setCopyMessageOnSend(false);
        activeMQConnectionFactory.setUseAsyncSend(true);
        connFactory.setTargetConnectionFactory(activeMQConnectionFactory);
        return connFactory;
    }

    @Bean
    public DefaultMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory()); 
        dmlc.setSessionTransacted(false);
        dmlc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);   
        dmlc.setDestinationName(TEST_QUEUE);
        dmlc.setMaxConcurrentConsumers(25); 
        dmlc.setMessageListener(msgListener()); 
        return container;
    }       
    
    @Bean
    public TestMsgListener msgListener() {
        return new TestMsgListener(); // This is a Message Driven POJO.
    }

    // MessageProducer code -
    @Autowired
    private JmsTemplate jTemplate;
    
    @Autowired
    private ActiveMQQueue testQueue;

    public void sendMessage() {
        try {
            MySerializedObject obj = <code to create new object>;
            jTemplate.convertAndSend(this.testQueue, obj);
        }catch(Throwable e) {
        }
    }

    // MessageListener code -
    public class TestMsgListener implements MessageListener {

    @Autowired
    public MessageConverter converter;

    public final void onMessage(Message message) {
        try {
             MySerializedObject obj = 
             (MySerializedObject)converter.fromMessage(message));

        } catch (Throwable e) {
          // log error.
        }
    }
 
0

There are 0 best solutions below