I have an api which is running on http://localhost:9999/greet/v3/api (API A) which responds to requests with 2 seconds delay (simulated). And I have created following integration flow to proxy all requests to aforementioned api.
How to mitigate backpressure to API A?
Followings are what I have done so far:
@Bean
public IntegrationFlow httpProxyFlowPin2(ConnectionFactory connectionFactory) throws Exception {
return IntegrationFlow
.from(WebFlux.inboundGateway("/gw2")
.requestChannel(Amqp.channel(connectionFactory).queueName("request").getObject())
.replyChannel(Amqp.channel(connectionFactory).queueName("response").getObject())
.mappedRequestHeaders("activityid")
.requestMapping(m -> m.methods(HttpMethod.GET)))
.handle(WebFlux.outboundGateway("http://localhost:9999/greet/v3/api").httpMethod(HttpMethod.GET).charset("utf-8")
.expectedResponseType(String.class))
.channel(Amqp.channel(connectionFactory).queueName("response2").getObject())
.get();
}
When I test the proxy api, I am receiving following error:
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1777) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1723) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1570) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1484) ~[spring-rabbit-3.0.9.jar:3.0.9]
at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493) ~[micrometer-observation-1.11.4.jar:1.11.4]
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.4.jar:1.11.4]
at io.micrometer.observation.Observation.observe(Observation.java:492) ~[micrometer-observation-1.11.4.jar:1.11.4]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1484) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:994) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:941) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1323) ~[spring-rabbit-3.0.9.jar:3.0.9]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-3.0.9.jar:3.0.9]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for amqp-channel 'application.response2.channel'.
at org.springframework.integration.amqp.channel.AbstractSubscribableAmqpChannel$DispatchingMessageListener.onMessage(AbstractSubscribableAmqpChannel.java:306) ~[spring-integration-amqp-6.1.3.jar:6.1.3]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1719) ~[spring-rabbit-3.0.9.jar:3.0.9]
... 14 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-6.1.3.jar:6.1.3]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.1.3.jar:6.1.3]
at org.springframework.integration.amqp.channel.AbstractSubscribableAmqpChannel$DispatchingMessageListener.onMessage(AbstractSubscribableAmqpChannel.java:294) ~[spring-integration-amqp-6.1.3.jar:6.1.3]
... 15 common frames omitted
I'm not fully sure what you are trying to achieve with all those AMQP-based channels in between. So, probably you can elaborate what is going on and why.
Regarding the error:
That just means that your
.channel(Amqp.channel(connectionFactory).queueName("response2").getObject())does not have a subscriber to consume messages from it. For your current configuration it would be just enough to have abridge()in the end of flow - and consumed message from that AMQP channel is going to be returned as a reply to theWebFlux.inboundGateway()in the beginning.You don't need a
replyChannel()configuration on that gateway, though. It makes sense only in scenarios where we need to publish a reply to several consumers. You might need to use a.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)before that.channel(Amqp.channel())to serialize aTemporaryReplyChannelinto its string representation to be able to transfer this info over RabbitMQ. Otherwise you'll fail with noreplyChannelheader when we apply the mentionedbridge().Either way I'm curious what you really try to do and what is that
to mitigate backpressureas a task?