I have retry to set to true. My understanding is that the message should be continuously delivered to the consumer over and over again. Instead it just sits there not consuming the requeued message or any new messages. I turned the logging up for com.budjb and com.rabbitmq and org.springframework.amqp all the way to TRACE and don't see any disconnect going on... Heeelppp

application.groovy

rabbitmq {
uri = new URI(System.env.CLOUDAMQP_URL ?: "amqp://test:test@localhost/test")
username = uri.userInfo.split(":")[0]
password = uri.userInfo.split(":")[1]

connections = [
        [name              : 'main',
         host              : uri.host,
         port              : 5672,
         username          : username,
         requestedHeartbeat: 10,
         automaticReconnect: true,
         virtualHost       : uri.path.substring(1),   //remove leading slash
         password          : password]
]

queues = [[name: com.coco.jms.RabbitQueues.INDEX_TRANSACTION.destinationName, autoDelete: false, durable: true, exclusive: false]]

Consumer:

class IndexTransactionConsumer implements MessageConsumerEventHandler {

static rabbitConfig = [
        connection: 'main',
        consumers : 1,
        queue     : Boolean.valueOf((String) System.getProperty("is_amqp_consumer")) ? RabbitQueues.INDEX_TRANSACTION.destinationName : null,
        transacted: true,
        autoAck   : AutoAck.POST,
        retry     : true
]

def handleMessage(Map body, MessageContext messageContext) {
    log.info("RABBITMQ - *CONSUME* Received event to index transaction (Map). " + body)

    throw new Exception("Force fail")
}
....
}

UPDATE it appears that the txRollback() that fires inside AbstractConsumerContext.groovy when transacted=true and autoAck = AutoAck.POST is stopping the basicReject nack from reaching the RabbitMQ server..

if (configuration.getTransacted()) {
    context.getChannel().txRollback()
}

if (configuration.getAutoAck() == AutoAck.POST) {
            context.getChannel().basicReject(context.getEnvelope().deliveryTag, configuration.getRetry())
}
1

There are 1 best solutions below

0
On

I solved my problem by not allowing exceptions to escape the listener and managing ack/nack's myself. I think there's a big in the rabbitmq-native plugin where transacted=true. It looks to me like it's rolling back the nack that's suppose to fire when an exception is caught.

def handleMessage(Map body, MessageContext context) {

    log.info("RABBITMQ - *CONSUME* Received event. " + body)

    try {
        //ensure casting by JMS to Integer is reverted
        body.conflictIDList = body.conflictIDList.collect { ((Number) it).toLong() }

        //do work

        context.channel.basicAck(context.envelope.deliveryTag, false)
    } catch (Exception ex) {

        ConsumerUtility.handleMessageException(rabbitMessagePublisher, body, context, ex)
    }
}

From ConsumerUtility

def
static handleMessageException(RabbitMessagePublisher rabbitMessagePublisher, Map body, MessageContext context, Throwable ex) {

    log.warn("E_ception caught attempting digest message sent to " + context.envelope.routingKey + ". body=" + body + ", reason=" + ex.message)
    if (body.retryCount < 3) {

        //pull current message off queue, sleep thread and republish onto back of queue
        context.channel.basicAck(context.envelope.deliveryTag, false)

        body.retryCount = body.retryCount + 1

        //upon failure sleep for 3, 6, then 9 seconds
        sleep(3000 * (Integer) body.retryCount)

        rabbitMessagePublisher.send {
            channel = context.channel
            routingKey = context.envelope.routingKey
            setBody(body)
        }
    } else {

        log.error("Rejecting message after three failed tries onto DLQ. body=" + body, ex)
        context.channel.basicReject(context.envelope.deliveryTag, false)
    }
}