I configured a route in Spring Integration 5.4.4 that read from an AMQP queue and write to an http outbound adapter. I'm not able to control retries when, for example, I programmatically declare a wrong http hostname for the http outbound adapter (Cause java.net.UnknownHostException).
This seems to generate an infinite retry (message not acked on RabbitMQ container), even if I configured the RetryTemplate logic in the amqpInboundAdapter.
My goal should be: requeue the message for N times in case http outbound adapter is in error, discard the message otherwise and don't requeue again.
Code here:
Spring Integration route
public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
return IntegrationFlows
.from(amqpInboundChannelAdapterSMLCSpec)
.filter(validJsonFilter())
.enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
.enrichHeaders(h -> h.header("Content-Encoding", "gzip"))
.enrichHeaders(h -> h.header("Content-Type", "application/json"))
.handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl) .mappedRequestHeaders("X-Insert-Key")
.httpMethod(HttpMethod.POST)
)
.get();
}
AmqpInboundChannelAdapterSMLCSpec
public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundAmqpAdapter(ConnectionFactory connectionFactory) {
RetryTemplate retryTemplate = new RetryTemplate();
exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
retryTemplate.setThrowLastExceptionOnExhausted(true);
return Amqp
.inboundAdapter(connectionFactory, rabbitConfig.inboundQueue())
.configureContainer(c -> c
.concurrentConsumers(3)
.maxConcurrentConsumers(5)
.receiveTimeout(2000)
.alwaysRequeueWithTxManagerRollback(false)
)
.retryTemplate(retryTemplate);
}
Any ideas?
Thanks a lot
When you use a retry on the AMQP MessageListenerContainer, there is on requeue: the retry is done in the memory without round trips to the broker.
Anyway what you do so far is OK. Only what you are missing is a
RejectAndDontRequeueRecovererto be configured for thatAmqp.inboundAdapter()to decide what to do with an AMQP message when all the retry attempts are exhausted.Unfortunately the direct
MessageRecovererconfiguration for channel adapter has been added since version5.5: https://docs.spring.io/spring-integration/docs/5.5.0-M3/reference/html/whats-new.html#x5.5-amqp.For the current version it has to be done via
recoveryCallback(RecoveryCallback<?> recoveryCallback)option and respective delegation: