I have implemented a basic asynchronous RPC call using spring boot 1.4 and rabbit mq.
My intention is to use this example as a basis of communication
among micro services.
For example, Publisher.java and Subscriber.java could be two micro services talking to each other.
The code shown works fine, but I am curious to know if there are any better ways of doing this?
My queries as follows:
- For subscriber to listen to request queue using
@RabbitListener
annotation , I did not had to declaredirectExchange()
andbinding()
beans in configuration.
But forasyncRabbitTemplate
to read response from reply queue, I had to declaredirectExchange()
andbinding()
beans in configuration.
Is there any way I can avoid it, because I feel it is code duplication as I am declaring these beans twice. - In real world application, there would be many such calls between micro services.And as per my understanding , I would need to declare similar
rpcReplyMessageListenerContainer()
andasyncRabbitTemplate()
for each request-reply call.
Is that correct?
Code as follows. Link to Github
Config.java
@Configuration("asyncRPCConfig")
@Profile("async_rpc")
@EnableScheduling
@EnableRabbit
@ComponentScan(basePackages = {"in.rabbitmq.async_rpc"})
public class Config {
@Value("${queue.reply}")
private String replyQueue;
@Value("${exchange.direct}")
private String directExchange;
@Value("${routingKey.reply}")
private String replyRoutingKey;
@Bean
public Publisher publisher() {
return new Publisher();
}
@Bean
public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue replyQueueRPC() {
return new Queue(replyQueue);
}
@Bean
public SimpleMessageListenerContainer rpcReplyMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueues(replyQueueRPC());
simpleMessageListenerContainer.setReceiveTimeout(2000);
simpleMessageListenerContainer.setTaskExecutor(Executors.newCachedThreadPool());
return simpleMessageListenerContainer;
}
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(ConnectionFactory connectionFactory) {
return new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
rpcReplyMessageListenerContainer(connectionFactory),
directExchange + "/" + replyRoutingKey);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(directExchange);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(replyQueueRPC()).to(directExchange()).with(replyRoutingKey);
}
@Bean
public Subscriber subscriber() {
return new Subscriber();
}
}
Publisher.java
public class Publisher {
@Value("${routingKey.request}")
private String requestRoutingKey;
@Autowired
private DirectExchange directExchange;
private static SecureRandom SECURE_RANDOM;
static {
try {
SECURE_RANDOM = SecureRandom.getInstanceStrong();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
@Autowired
private AsyncRabbitTemplate asyncRabbitTemplate;
@Scheduled(fixedDelay = 100 * 1)
public void publishToDirectExchangeRPCStyle() {
Integer integer = SECURE_RANDOM.nextInt();
SampleRequestMessage sampleRequestMessage = new SampleRequestMessage(String.valueOf(integer));
System.out.println("Sending out message on direct directExchange:" + sampleRequestMessage);
AsyncRabbitTemplate.RabbitConverterFuture<SampleResponseMessage> sampleResponseMessageRabbitConverterFuture = asyncRabbitTemplate
.convertSendAndReceive(directExchange.getName(), requestRoutingKey, sampleRequestMessage);
sampleResponseMessageRabbitConverterFuture.addCallback(
sampleResponseMessage ->
System.out.println("Response for request message:" + sampleRequestMessage + " is:" + sampleResponseMessage)
, failure ->
System.out.println(failure.getMessage())
);
}
}
Subscriber.java
public class Subscriber {
@RabbitHandler
@RabbitListener(
bindings = {
@QueueBinding(value = @Queue("${queue.request}"),
key = "${routingKey.request}",
exchange = @Exchange(value = "${exchange.direct}", type = ExchangeTypes.DIRECT, durable = "true"))})
public SampleResponseMessage subscribeToRequestQueue(@Payload SampleRequestMessage sampleRequestMessage, Message message) {
System.out.println("Received message :" + message);
return new SampleResponseMessage(sampleRequestMessage.getMessage());
}
}
Your solution is fine.
It is not clear what you are asking...
@QueueBinding
is simply a convenience on@RabbitListener
and an alternative to declaring the queue, exchange and binding as@Bean
s.If you are using a common
@Config
class you can simply omit thebindings
attribute on the listener and usequeues = "${queue.reply}"
to avoid the duplication.Yes; although with the upcoming 2.0 release, you can use a
DirectReplyToMessageListenerContainer
which avoids the need for a separate reply queue for each service; when you send a message.See the documentation here and here.
(Should read "as an alternative to " rather than "instead of").
So you can use the same template to talk to multiple services.