I want to create a simple IntegrationFlow with Spring integration, and I am having difficulties.
I want to create an integration flow that takes messages from multiple queues in Rabbit Mq and posts the messages to different Rest endpoints.
i want to know if i can parallelize this.
i have two scenarios that i want to check the feasibility :
- the first one i want to create a thread for every RabbitMq Queue that would listen and execute the flow after receiving a message :
- the second scenario : in this scenario i want to create a dynamic number of threads for every queue , the number of threads goes up or down depending on the number of messages.
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
}
For the first scenario, simply configure a inbound adapter for each and set the output channel to a common channel for the downstream flow.
For the second scenario, simply set the
concurrentConsumers
andmaxConcurrentConsumers
on the listener container and it will scale up/down the threads as needed.See the Spring AMQP Documentation.