Routing to virtual destinations inside ActiveMQ broker

4.1k Views Asked by At

I have an activemq configuration wherein I have a virtual destination and a normal topic

I want to route all the JMS messages to the destination(VirtualTopic.Notifications) to 2 queues(VirtualTopic.SMS, VirtualTopic.EMAIL) based on their JMSType in the message header.

And I want the normal Topic(VirtualTopic.gps) to work as usual.

This is my configuration of activemq.xml. Here Consumer.SMS.VirtualTopic and Consumer.EMAIL.VirtualTopic is created.

    <destinations>
        <queue physicalName="Consumer.SMS.VirtualTopic" />
        <queue physicalName="Consumer.EMAIL.VirtualTopic" />
    </destinations>

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>

While the consumer and topic (VirtualTopic.gps) is created from the server side code.

    private static MessageProducer getTopicProducer(String topicName) throws JMSException {
    MessageProducer producer = topicProducers.get(topicName);

    if (producer == null) {
        logger.info("Creating message producer for Topic : {}", topicName);
        Destination destination = session.createTopic(topicName);

        List<String> queueNames = PropertyReader
                .getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
        if (queueNames != null) {
            for (String queueName : queueNames) { 
                Queue virtualQueue = session.createQueue(queueName);
                MessageConsumer con = session.createConsumer(virtualQueue);
                con.close();
            }
        }

        producer = session.createProducer(destination);
        topicProducers.put(topicName, producer);
    }

    return producer;
    }

All the messages to the VirtualTopic.Notifications are routed to 2 different queues and consumers can pick up messages from respective queues

But the issue is all the messages which are being sent to the VirtualTopic.gps are filtered and the consumers cant consume the gps messages.

2

There are 2 best solutions below

0
On BEST ANSWER

Thank you so much Hassen..

Adding this line <virtualTopic name=">" selectorAware="false" /> to the activemq.xml did the trick.

    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <compositeQueue name="VirtualTopic.Notifications"
                    forwardOnly="false">
                    <forwardTo>
                        <filteredDestination selector="JMSType = 'SMS'"
                            queue="Consumer.SMS.VirtualTopic" />
                        <filteredDestination selector="JMSType ='EMAIL'"
                            queue="Consumer.EMAIL.VirtualTopic" />
                    </forwardTo>
                </compositeQueue>
                <virtualTopic name=">" selectorAware="false" />
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>
3
On

The following example shows how to set up a element in the XML configuration so that when a message is sent to MY.QUEUE then it is really forwarded to the physical queue FOO and the topic BAR.

<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <compositeQueue name="MY.QUEUE">
        <forwardTo>
          <queue physicalName="FOO" />
          <topic physicalName="BAR" />
        </forwardTo>
      </compositeQueue>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>

By default, subscribers cannot consume messages directly from a composite queue or topic - it is a logical construct only. Given the configuration above, subscribers can only consume messages from FOO and BAR; but not MY.QUEUE. This behaviour can be altered to implement use cases such as watching a queue by sending the same messages to a notification topic (wire tapping), by setting the optionally set forwardOnly attribute to false.

<compositeQueue name="IncomingOrders" forwardOnly="false">
    <forwardTo>
        <topic physicalName="Notifications" />
    </forwardTo>
</compositeQueue>

Messages sent to IncomingOrders will all be copied and forwarded to Notifications, before being placed on the physical IncomingOrders queue for consumption by subscribers.

take a look here http://activemq.apache.org/virtual-destinations.html

with your actual config you can consume only from queue's SMS & EMAIL, if you want to consume from Notifications you need to set forwardOnly="false"

UPDATE : Try this code :

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;

public class SimpleSenderConsumerVirtualTopic {

    public static void main(String[] args) throws JMSException {
        Connection conn = null;
        try {
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
            conn = cf.createConnection( );
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.AUTO_ACKNOWLEDGE);
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
                    .createConsumer(session.createQueue("Consumer.A.VirtualTopic.gps"));
            MessageProducer producer = session.createProducer(session.createTopic("VirtualTopic.gps"));
            conn.start();
            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("VirtualTopic.gps test");
            producer.send(msg);
            msg = null;
            while ((msg = (ActiveMQTextMessage) consumer.receive(5000)) != null) {
                System.out.println("Received message is: " + msg.getText());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }
}

AND add this :

<destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
          <virtualTopic name=">"  selectorAware="false" />
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>