Our Spring 5 application is configured to use ActiveMQ and to persist the messages. We decided to go with KahaDB file storage.
To test message persistence, I commented out the MessageListener bean and sent a message to a queue (via JmsTemplate) and verified the message gets written to kahadb data log files. After un-commenting the MessageListener bean and restarting the broker, the message does not get delivered to the listener. I am unable to figure out why the message is not delivered upon broker restart, any help will be greatly appreciated.
Below is the code that adds KahaDb persistence to ActiveMQ configuration:
@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService brokerServiceConfig() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.addConnector("vm://localhost");
brokerService.setBrokerName("order-broker");
PersistenceAdapter kahaDbAdapter = new KahaDBPersistenceAdapter();
File kahaDir = new File("/home/test");
kahaDbAdapter.setDirectory(kahaDir);
brokerService.setPersistenceAdapter(kahaDbAdapter);
brokerService.setPersistent(true);
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setExplicitQosEnabled(true);
template.setDeliveryMode(DeliveryMode.PERSISTENT);
return template;
}
@Bean
@DependsOn({"brokerService"})
private static ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("vm://my-broker?create=false");
List<String> trustedPackageList = new ArrayList<>(activeMQConnectionFactory.getTrustedPackages());
trustedPackageList.add("com.mypackages");
activeMQConnectionFactory.setTrustedPackages(trustedPackageList);
CachingConnectionFactory connFactory = new CachingConnectionFactory();
activeMQConnectionFactory.setCopyMessageOnSend(false);
activeMQConnectionFactory.setUseAsyncSend(true);
connFactory.setTargetConnectionFactory(activeMQConnectionFactory);
return connFactory;
}
@Bean
public DefaultMessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory());
dmlc.setSessionTransacted(false);
dmlc.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
dmlc.setDestinationName(TEST_QUEUE);
dmlc.setMaxConcurrentConsumers(25);
dmlc.setMessageListener(msgListener());
return container;
}
@Bean
public TestMsgListener msgListener() {
return new TestMsgListener(); // This is a Message Driven POJO.
}
// MessageProducer code -
@Autowired
private JmsTemplate jTemplate;
@Autowired
private ActiveMQQueue testQueue;
public void sendMessage() {
try {
MySerializedObject obj = <code to create new object>;
jTemplate.convertAndSend(this.testQueue, obj);
}catch(Throwable e) {
}
}
// MessageListener code -
public class TestMsgListener implements MessageListener {
@Autowired
public MessageConverter converter;
public final void onMessage(Message message) {
try {
MySerializedObject obj =
(MySerializedObject)converter.fromMessage(message));
} catch (Throwable e) {
// log error.
}
}