Spring amqp RetryCache is not cleared when AmqpRejectAndDontRequeueException is thrown

75 Views Asked by At

I have a simple Rabbit listener which always throws AmqpRejectAndDontRequeueException (to test if listener can handle multiple invalid messages).

The config is:

@RabbitListener(
    id = TestConfig.LISTENER_ID,
    queues = TestConfig.DATA_QUEUE,
    containerFactory = TestConfig.LISTENER_FACTORY
)
public void consume(String data, Message message) {
    if (true) {
        throw new AmqpRejectAndDontRequeueException("don't requeue");
    }
}
static final String LISTENER_ID = "listenerId";

static final String DATA_QUEUE = "data.queue";

static final String LISTENER_FACTORY = "listenerFactory";

private final AmqpAdmin amqpAdmin;

private final ConnectionFactory connectionFactory;

TestConfig(AmqpAdmin amqpAdmin, ConnectionFactory connectionFactory) {
    this.amqpAdmin = amqpAdmin;
    this.connectionFactory = connectionFactory;
}

@Bean
RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

@Bean
public Queue consumedDataQueue() {
    Queue queue = new Queue(DATA_QUEUE);
    queue.setAdminsThatShouldDeclare(amqpAdmin);
    return queue;
}

@Bean(name = LISTENER_FACTORY)
public SimpleRabbitListenerContainerFactory listenerFactory(RabbitTransactionManager rabbitTransactionManager) {

    StatefulRetryOperationsInterceptor backOffRetryInterceptor = statefulRetryOperationsInterceptor();
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(1);
    factory.setAutoStartup(true);
    factory.setTransactionManager(rabbitTransactionManager);
    factory.setAdviceChain(backOffRetryInterceptor);

    return factory;
}

private StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {

    RejectAndDontRequeueRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer(); // to be changed

    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryContextCache(new MapRetryContextCache(3));
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));

    return RetryInterceptorBuilder.stateful()
      .retryOperations(retryTemplate)
      .recoverer(messageRecoverer)
      .build();
}

When AmqpRejectAndDontRequeueException is thrown in listener, MapRetryContextCache is filling up, but not cleared. Application ends up with RetryCacheCapacityExceededException thrown.

I've managed to make it working throwing custom exception from listener and processing it in custom message recoverer. But message is still requeued once in this case according to retry policy. What I am doing wrong? Shouldn't I use AmqpRejectAndDontRequeueException with stateful interceptor? How to reject and do not requeue message with stateful interceptor?

1

There are 1 best solutions below

1
On

Spring Retry knows nothing about the semantics of Spring AMQP exceptions.

You should configure the interceptor such that AmqpRejectAndDontRequeueException is a not-retryable exception to avoid this problem.