How to create backpressure mitigation using rabbitmq as buffer in spring integration?

71 Views Asked by At

I have an api which is running on http://localhost:9999/greet/v3/api (API A) which responds to requests with 2 seconds delay (simulated). And I have created following integration flow to proxy all requests to aforementioned api.

How to mitigate backpressure to API A?

Followings are what I have done so far:

@Bean
public IntegrationFlow httpProxyFlowPin2(ConnectionFactory connectionFactory) throws Exception {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/gw2")
                    .requestChannel(Amqp.channel(connectionFactory).queueName("request").getObject())
                    .replyChannel(Amqp.channel(connectionFactory).queueName("response").getObject())
                    .mappedRequestHeaders("activityid")
                    .requestMapping(m -> m.methods(HttpMethod.GET)))
            .handle(WebFlux.outboundGateway("http://localhost:9999/greet/v3/api").httpMethod(HttpMethod.GET).charset("utf-8")
                    .expectedResponseType(String.class))
            .channel(Amqp.channel(connectionFactory).queueName("response2").getObject())
            .get();
}

When I test the proxy api, I am receiving following error:

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1777) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1723) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1570) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1484) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493) ~[micrometer-observation-1.11.4.jar:1.11.4]
    at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.4.jar:1.11.4]
    at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.11.4.jar:1.11.4]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1484) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:994) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:941) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1323) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-3.0.9.jar:3.0.9]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for amqp-channel 'application.response2.channel'.
    at org.springframework.integration.amqp.channel.AbstractSubscribableAmqpChannel$DispatchingMessageListener.onMessage(AbstractSubscribableAmqpChannel.java:306) ~[spring-integration-amqp-6.1.3.jar:6.1.3]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1719) ~[spring-rabbit-3.0.9.jar:3.0.9]
    ... 14 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-6.1.3.jar:6.1.3]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.1.3.jar:6.1.3]
    at org.springframework.integration.amqp.channel.AbstractSubscribableAmqpChannel$DispatchingMessageListener.onMessage(AbstractSubscribableAmqpChannel.java:294) ~[spring-integration-amqp-6.1.3.jar:6.1.3]
    ... 15 common frames omitted
1

There are 1 best solutions below

0
Artem Bilan On

I'm not fully sure what you are trying to achieve with all those AMQP-based channels in between. So, probably you can elaborate what is going on and why.

Regarding the error:

Dispatcher has no subscribers for amqp-channel 'application.response2.channel'.

That just means that your .channel(Amqp.channel(connectionFactory).queueName("response2").getObject()) does not have a subscriber to consume messages from it. For your current configuration it would be just enough to have a bridge() in the end of flow - and consumed message from that AMQP channel is going to be returned as a reply to the WebFlux.inboundGateway() in the beginning.

You don't need a replyChannel() configuration on that gateway, though. It makes sense only in scenarios where we need to publish a reply to several consumers. You might need to use a .enrichHeaders(HeaderEnricherSpec::headerChannelsToString) before that .channel(Amqp.channel()) to serialize a TemporaryReplyChannel into its string representation to be able to transfer this info over RabbitMQ. Otherwise you'll fail with no replyChannel header when we apply the mentioned bridge().

Either way I'm curious what you really try to do and what is that to mitigate backpressure as a task?