Transactional consumption in RabbitMQ

441 Views Asked by At

I have been new in RabbitMQ yet. I need a MOM system for these purposes:

  1. A published message is consumed until my logic executed successfully.
  2. The broker doesn't have to remove the published message from the queue until my logic executed successfully.

For those goals, I wrote the following code at the first try:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
            new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)throws IOException
                {
                    try
                    {
                        channel.txSelect();
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        long deliveryTag = envelope.getDeliveryTag();
                        System.out.println("Recieve Message is :" + new String(body));
                        int reslt = //execute my logic   
                        if(result == 0)
                            channel.txCommit();
                        else
                            channel.txRollback();
                    }
                    catch(Throwable t)
                    {
                        t.printStackTrace();
                    }
                }
            });

By this approach, I achieve the second purpose, in others words, the broker doesn't delete my message but one time all of the message in the queue consumed and all of them rollbacked, the broker doesn't send messages to my consumer again.

At the second try, I wrote the following code:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
            new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)throws IOException
                {
                    try
                    {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        long deliveryTag = envelope.getDeliveryTag();
                        System.out.println("Recieve Message is :" + new String(body));
                        int reslt = //execute my logic   
                        if(result == 0)
                            channel.basicAck(deliveryTag, false);
                        else
                            channel.basicNack(deliveryTag,false,true);
                    }
                    catch(Throwable t)
                    {
                        t.printStackTrace();
                    }
                }
            });

By this solution, I achieved to both goals, but I don't know my code is correct or not? Does the approach cause problems in the production environment with high TPS? I don't know the requeue flag of basicNack method is heavy or light?

1

There are 1 best solutions below

0
On

I had same requirement few month back. I went through many solutions and here is what work for me.

I stored delivery tag somewhere in memory, if my logic went well i acknowledge message manually or i reject that message. I used below methods for this purpose.

if (success)
connectionModel.BasicAck(Convert.ToUInt64(uTag), false);
else
connectionModel.BasicReject(Convert.ToUInt64(uTag), true);

Above code is working fine in my production with 5000msg/sec. Well, basicNack method was causing issue for me.