Configuring priority in rabbitmq

743 Views Asked by At

I have the following configuration that I am attempting to change to support priority queues. Based on done research it says I should have two queues one for each priority. I adjusted my configuration from the following:

@Configuration
public class FixedReplyQueueConfig {

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setUsername("urbanbuz");
    connectionFactory.setPassword("ub");
    connectionFactory.setVirtualHost("urbanbuzvhost");
    return connectionFactory;
}

/**
 * @return Rabbit template with fixed reply queue.
 */
@Bean
public RabbitTemplate fixedReplyQRabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setExchange(ex().getName());
    template.setRoutingKey("test");
    template.setReplyQueue(replyQueue());
    return template;
}

/**
 * @return The reply listener container - the rabbit template is the listener.
 */
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(fixedReplyQRabbitTemplate());
    return container;
}

/**
 * @return The listener container that handles the request and returns the reply.
 */
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(requestQueue());
    container.setMessageListener(new MessageListenerAdapter(new PojoListener()));
    return container;
}

@Bean
public DirectExchange ex() {
    return new DirectExchange("ub.exchange", false, true);
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(requestQueue()).to(ex()).with("test");
}

@Bean
public Queue requestQueue() {
    return new Queue("ub.request");
}

@Bean
public Queue replyQueue() {
    return new Queue("ub.reply");
}

/**
 * @return an admin to handle the declarations.
 */
@Bean
public RabbitAdmin admin() {
    return new RabbitAdmin(rabbitConnectionFactory());
}
}

to the following:

@Configuration
public class FixedReplyQueueConfig {

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setUsername("urbanbuz");
    connectionFactory.setPassword("ub");
    connectionFactory.setVirtualHost("urbanbuzvhost");
    return connectionFactory;
}

/**
 * @return Rabbit template with fixed reply queue.
 */
@Bean
public RabbitTemplate fixedReplyQRabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setExchange(ex().getName());
    template.setRoutingKey("high");
    template.setRoutingKey("normal");
    template.setReplyQueue(replyQueue());
    return template;
}


/**
 * @return The reply listener container - the rabbit template is the listener.
 */
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(fixedReplyQRabbitTemplate());
    return container;
}


/**
 * @return The listener container that handles the request and returns the reply.
 */
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(requestQueueHigh(), requestQueue());
    container.setMessageListener(new MessageListenerAdapter(new PojoListener()));
    return container;
}

@Bean
public DirectExchange ex() {
    return new DirectExchange("ub.exchange", false, true);
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(requestQueue()).to(ex()).with("normal");
}

@Bean
public Binding bindingHigh() {
    return BindingBuilder.bind(requestQueueHigh()).to(ex()).with("high");
}

@Bean
public Queue requestQueue() {
    return new Queue("ub.request");
}

@Bean
public Queue requestQueueHigh() {
    return new Queue("ub.request.high");
}

@Bean
public Queue replyQueue() {
    return new Queue("ub.reply");
}

/**
 * @return an admin to handle the declarations.
 */
@Bean
public RabbitAdmin admin() {
    return new RabbitAdmin(rabbitConnectionFactory());
}
}

Am I going around this the correct way? Now how should I proceed to make the consumer consume one over the other?

This is how I call to test:

public class App {  
public static void main(String[] args) {        
    ApplicationContext context = new AnnotationConfigApplicationContext(FixedReplyQueueConfig.class);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

    rabbitTemplate.convertSendAndReceive("ub.exchange", "normal" , "yalla");

    rabbitTemplate.convertSendAndReceive("ub.exchange", "high" , "hello");
}
}

And this is my pojo class:

public class PojoListener {
public String handleMessage(String foo) {
    System.out.println("IN MESSAGE RECEIVER");
    return "it's the weekend!!!!!!!!!!!!";
}
}

Both messages are sent through however not sure how priority is implemented with the implemented configuration and classes.

Note that I'm trying to avoid any plugins to support priority.

1

There are 1 best solutions below

6
On BEST ANSWER

The code from Spring AMQP (BlockingQueueConsumer) looks like:

for (String queueName : queues) {
    if (!this.missingQueues.contains(queueName)) {
        consumeFromQueue(queueName);
    }
}

Os, if your config is:

container.setQueues(requestQueueHigh(), requestQueue());

You are really going to receive messages from requestQueueHigh() over requestQueue(). And it is independently of concurrency on ListenerContainer.

You can test that with several messages, but with stopped listener from the start of application. And start() listener container eventually after sending for those messages.