I'm working on a project that involves 2 queue's, and multiples Listeners interacting with them. Flow:
- New HTTP request comes to the server, then it's converted into a Object that will be the message
- This message has to be published in two queues
- I have two types of Listeners that get messages from each queue and then I do whatever I want
I've been reading and the best way to do is with a fanout-exchange. Here is my code:
listener-configuration.xml
<!-- CREATE CONNECTION FACTORY -->
<rabbit:connection-factory id="connectionFactory"
host="localhost" username="guest" password="guest" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- <!-- RABBIT QUEUE'S -->
<rabbit:queue id="trashroute.rabbit.queue" name="trashroute.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- Webapp Queue -->
<rabbit:queue id="trashroute2.rabbit.queue" name="trashroute2.rabbit.queue" auto-delete="false" auto-startup=false
durable="true" />
<!-- CREATE AN EXCHANGE AND BIND THE QUEUE WITH MY.ROUTINGKEY.* TO THE EXCHANGE -->
<rabbit:fanout-exchange id="myExchange" name="trashroute-exchange">
<rabbit:bindings>
<rabbit:binding queue="trashroute.rabbit.queue"></rabbit:binding>
<rabbit:binding queue="trashroute2.rabbit.queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- CREATE THE RABBIT TEMPLATES -->
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute.rabbit.queue"/>
<rabbit:template connection-factory="connectionFactory" exchange="myExchange" queue="trashroute2.rabbit.queue"/>
<!-- INSTANTIATE THE LISTENERS -->
<bean id="persistenceListener" class="trashroute.rabbitmq.listener.PersistenceListener" />
<bean id="webappListener" class="trashroute.rabbitmq.listener.WebappListener" />
<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />
<!-- GLUE THE LISTENER AND QUEUE TO THE LISTENER CONTAINER -->
<rabbit:listener-container id="listenerContainer"
connection-factory="connectionFactory" message-converter="jsonMessageConverter">
<rabbit:listener ref="persistenceListener" queues="trashroute.rabbit.queue" />
<rabbit:listener ref="webappListener" queues="trashroute2.rabbit.queue" />
</rabbit:listener-container>
sender-configuration.xml
<!-- First following line creates a rabbit connection factory with specified parameters -->
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" />
<!-- Obtain admin rights to create an exchange -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- Create a bean which can send message to trashroute-exchange for the Java program to call -->
<rabbit:template id="template" connection-factory="connectionFactory" exchange="myExchange"
message-converter="jsonMessageConverter" />
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.JsonMessageConverter"/>
</property>
Listener MainConfiguration.java
@Configuration
public class MainConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public DataController DataController(){
return new DataController();
}
@Bean
// Every queue is bound to the default direct exchange
public Queue persistenceQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.persistenceQueue, true);
}
@Bean
public Queue webappQueue() {
//Create a new queue with an specific name and the durability value in true.
return new Queue(this.webappQueue, true);
}
}
Sender MainConfiguration.java
@Configuration
public class SenderConfiguration {
protected final String persistenceQueue = "trashroute.rabbit.queue";
protected final String webappQueue = "trashroute2.rabbit.queue";
//Create the Template
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new JsonMessageConverter());
return template;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
"localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public IServiceManager scheduledProducer() {
return new ServiceManagerImpl();
}
@Bean
public BeanPostProcessor postProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
Can anyone please tell me what I'm doing wrong? One of the two Listeners, works perfectly, the second never reads a message.
Based on the scenario explained above I have tried to create a sample application which uses Spring Java Config.
Messages are published to
trashroute
andwebapp
queues, and respective receivers (persistence
andwebapp
) receive the messages.RabbitConfiguration.java (Contains configuration for both Sender and Receiver)
PersistenceListener.java
WebAppListener.java
Application.java
Hope this will help. Although you would need to refactor the code if you want to use this in production.