Multiple queues receiving same message from virtual topic creates a deadletter entry for one queue only

228 Views Asked by At

I'm am using Virtual Destinations to implement Publish Subscribe model in ActiveMQ 5.15.13.

I have a virtual topic VirtualTopic and there are two queues bound to it. Each queue has its own redelivery policy. Let's say Queue 1 will retry message 2 times in case there is an exception while processing the message and Queue 2 will retry message 3 times. Post retry message will be sent to deadletter queue. I'm also using Individual Dead letter Queue strategy so that each queue has it's own deadletter queue.

I've observed that when a message is sent to VirtualTopic, the message with same message id is delivered to both the queues. I'm facing an issue where if the consumers of both queues are not able to process the message successfully. The message destined for Queue 1 is moved to deadletter queue after retrying for 2 times. But there is no deadletter queue for Queue 2, though message in Queue 2 is retried for 3 times.

Is it the expected behavior?

Code:

public class ActiveMQRedelivery {

    private final ActiveMQConnectionFactory factory;

    public ActiveMQRedelivery(String brokerUrl) {
        factory = new ActiveMQConnectionFactory(brokerUrl);
        factory.setUserName("admin");
        factory.setPassword("password");
        factory.setAlwaysSyncSend(false);
    }

    public void publish(String topicAddress, String message) {
        final String topicName = "VirtualTopic." + topicAddress;
        try {
            final Connection producerConnection = factory.createConnection();
            producerConnection.start();
            final Session producerSession = producerConnection.createSession(false, AUTO_ACKNOWLEDGE);
            final MessageProducer producer = producerSession.createProducer(null);
            final TextMessage textMessage = producerSession.createTextMessage(message);
            final Topic topic = producerSession.createTopic(topicName);
            producer.send(topic, textMessage, PERSISTENT, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE);
        } catch (JMSException e) {
            throw new RuntimeException("Message could not be published", e);
        }

    }

    public void initializeConsumer(String queueName, String topicAddress, int numOfRetry) throws JMSException {
        factory.getRedeliveryPolicyMap().put(new ActiveMQQueue("*." + queueName + ".>"),
                getRedeliveryPolicy(numOfRetry));
        Connection connection = factory.createConnection();
        connection.start();
        final Session consumerSession = connection.createSession(false, CLIENT_ACKNOWLEDGE);
        final Queue queue = consumerSession.createQueue("Consumer." + queueName +
                ".VirtualTopic." + topicAddress);
        final MessageConsumer consumer = consumerSession.createConsumer(queue);
        consumer.setMessageListener(message -> {
            try {
                    System.out.println("in listener --- " + ((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());
                    consumerSession.recover();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }

    private RedeliveryPolicy getRedeliveryPolicy(int numOfRetry) {
        final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0);
        redeliveryPolicy.setMaximumRedeliveries(numOfRetry);
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        redeliveryPolicy.setRedeliveryDelay(0);
        return redeliveryPolicy;
    }

}

Test:

public class ActiveMQRedeliveryTest {

    private static final String brokerUrl = "tcp://0.0.0.0:61616";
    private ActiveMQRedelivery activeMQRedelivery;

    @Before
    public void setUp() throws Exception {
        activeMQRedelivery = new ActiveMQRedelivery(brokerUrl);
    }

    @Test
    public void testMessageRedeliveries() throws Exception {
        String topicAddress = "testTopic";
        activeMQRedelivery.initializeConsumer("queue1", topicAddress, 2);
        activeMQRedelivery.initializeConsumer("queue2", topicAddress, 3);
        activeMQRedelivery.publish(topicAddress, "TestMessage");
        Thread.sleep(3000);
    }

    @After
    public void tearDown() throws Exception {
    }
}
1

There are 1 best solutions below

0
On

I recently came across this problem. To fix this there are 2 attributes that needs to be added to individualDeadLetterStrategy as below

<deadLetterStrategy>
        <individualDeadLetterStrategy destinationPerDurableSubscriber="true" enableAudit="false" queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>

Explanation of attributes:

destinationPerDurableSubscriber - To enable a separate destination per durable subscriber.

enableAudit - The dead letter strategy has a message audit that is enabled by default. This prevents duplicate messages from being added to the configured DLQ. When the attribute is enabled, the same message that isn't delivered for multiple subscribers to a topic will only be placed on one of the subscriber DLQs when the destinationPerDurableSubscriber attribute is set to true i.e. say two consumers fail to acknowledge the same message for the topic, that message will only be placed on the DLQ for one consumer and not the other.