Asynchronous RPC using Spring Boot RabbitMQ

4.9k Views Asked by At

I have implemented a basic asynchronous RPC call using spring boot 1.4 and rabbit mq.
My intention is to use this example as a basis of communication among micro services.
For example, Publisher.java and Subscriber.java could be two micro services talking to each other.

The code shown works fine, but I am curious to know if there are any better ways of doing this?

My queries as follows:

  • For subscriber to listen to request queue using @RabbitListener annotation , I did not had to declare directExchange() and binding() beans in configuration.
    But for asyncRabbitTemplate to read response from reply queue, I had to declare directExchange() and binding() beans in configuration.
    Is there any way I can avoid it, because I feel it is code duplication as I am declaring these beans twice.
  • In real world application, there would be many such calls between micro services.And as per my understanding , I would need to declare similar rpcReplyMessageListenerContainer() and asyncRabbitTemplate() for each request-reply call.
    Is that correct?

Code as follows. Link to Github

Config.java

@Configuration("asyncRPCConfig")
@Profile("async_rpc")
@EnableScheduling
@EnableRabbit
@ComponentScan(basePackages = {"in.rabbitmq.async_rpc"})
public class Config {

    @Value("${queue.reply}")
    private String replyQueue;
    @Value("${exchange.direct}")
    private String directExchange;
    @Value("${routingKey.reply}")
    private String replyRoutingKey;

    @Bean
    public Publisher publisher() {
        return new Publisher();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                                      SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Queue replyQueueRPC() {
        return new Queue(replyQueue);
    }

    @Bean
    public SimpleMessageListenerContainer rpcReplyMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.setQueues(replyQueueRPC());
        simpleMessageListenerContainer.setReceiveTimeout(2000);
        simpleMessageListenerContainer.setTaskExecutor(Executors.newCachedThreadPool());
        return simpleMessageListenerContainer;
    }


    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(ConnectionFactory connectionFactory) {

        return new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
                        rpcReplyMessageListenerContainer(connectionFactory),
                        directExchange + "/" + replyRoutingKey);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(directExchange);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(replyQueueRPC()).to(directExchange()).with(replyRoutingKey);
    }


    @Bean
    public Subscriber subscriber() {
        return new Subscriber();
    }

}

Publisher.java

public class Publisher {

    @Value("${routingKey.request}")
    private String requestRoutingKey;
    @Autowired
    private DirectExchange directExchange;

    private static SecureRandom SECURE_RANDOM;

    static {
        try {
            SECURE_RANDOM = SecureRandom.getInstanceStrong();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }


    @Autowired
    private AsyncRabbitTemplate asyncRabbitTemplate;


    @Scheduled(fixedDelay = 100 * 1)
    public void publishToDirectExchangeRPCStyle() {
        Integer integer = SECURE_RANDOM.nextInt();
        SampleRequestMessage sampleRequestMessage = new SampleRequestMessage(String.valueOf(integer));
        System.out.println("Sending out message on direct directExchange:" + sampleRequestMessage);

        AsyncRabbitTemplate.RabbitConverterFuture<SampleResponseMessage> sampleResponseMessageRabbitConverterFuture = asyncRabbitTemplate
                        .convertSendAndReceive(directExchange.getName(), requestRoutingKey, sampleRequestMessage);
        sampleResponseMessageRabbitConverterFuture.addCallback(
                        sampleResponseMessage ->
                                        System.out.println("Response for request message:" + sampleRequestMessage + " is:" + sampleResponseMessage)
                        , failure ->
                                        System.out.println(failure.getMessage())
        );

    }
}

Subscriber.java

public class Subscriber {

    @RabbitHandler
    @RabbitListener(
                    bindings = {
                                    @QueueBinding(value = @Queue("${queue.request}"),
                                                    key = "${routingKey.request}",
                                                    exchange = @Exchange(value = "${exchange.direct}", type = ExchangeTypes.DIRECT, durable = "true"))})
    public SampleResponseMessage subscribeToRequestQueue(@Payload SampleRequestMessage sampleRequestMessage, Message message) {
        System.out.println("Received message :" + message);
        return new SampleResponseMessage(sampleRequestMessage.getMessage());
    }
}
1

There are 1 best solutions below

3
On BEST ANSWER

Your solution is fine.

It is not clear what you are asking...

I had to declare directExchange() and binding() beans in configuration. Is there any way I can avoid it, because I feel it is code duplication as I am declaring these beans twice.

@QueueBinding is simply a convenience on @RabbitListener and an alternative to declaring the queue, exchange and binding as @Beans.

If you are using a common @Config class you can simply omit the bindings attribute on the listener and use queues = "${queue.reply}" to avoid the duplication.

I would need to declare similar rpcReplyMessageListenerContainer() and asyncRabbitTemplate() for each request-reply call. Is that correct?

Yes; although with the upcoming 2.0 release, you can use a DirectReplyToMessageListenerContainer which avoids the need for a separate reply queue for each service; when you send a message.

See the documentation here and here.

Starting with version 2.0, the async template now supports Direct reply-to instead of a configured reply queue.

(Should read "as an alternative to " rather than "instead of").

So you can use the same template to talk to multiple services.