Issues getting ActiveMQ Advisory messages for MessageConsumed

553 Views Asked by At

I need to be able to receive notification when a ActiveMQ client consumes a MQTT message.

activemq.xml

<destinationPolicy>
    <policyMap>
      <policyEntries>
            <policyEntry topic=">" advisoryForConsumed="true" />
      </policyEntries>
    </policyMap>
</destinationPolicy>

In the below code, I get MQTT messages on myTopic fine. I do not get advisory messages in processAdvisoryMessage / processAdvisoryBytesMessage.

@Component
public class MqttMessageListener {
    @JmsListener(destination = "mytopic")
    public void processMessage(BytesMessage message) {
    }

    @JmsListener(destination = "ActiveMQ.Advisory.MessageConsumed.Topic.>")
    public void processAdvisoryMessage(Message message) {
        System.out.println("processAdvisoryMessage Got a message");
    }

    @JmsListener(destination = "ActiveMQ.Advisory.MessageConsumed.Topic.>")
    public void processAdvisoryBytesMessage(BytesMessage message) {
        System.out.println("processAdvisoryBytesMessageGot a message");
    }
}

What am I doing wrong?

I have also attempted doing this with a ActiveMQ BrokerFilter:

public class AMQMessageBrokerFilter extends GenericBrokerFilter {
    @Override
    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
        super.acknowledge(consumerExchange, ack);
    }

@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
    Message message = messageDispatch.getMessage();
}

@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
    log.debug("messageDelivered called.");
    super.messageDelivered(context, messageReference);
}

@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
    log.debug("messageConsumed called.");
    super.messageConsumed(context, messageReference);
}   

In this second scenario I was unable to both have the message and a contect with which to send the consumed notification. acknowledge/messageDelivered/messageConsumed all have a connection context but only postProcessDispatch has the message which I need part of it (payload is JSON) in order to send my outgoing message. I could be eager and use send which has both but it is safer to wait until at least it was acknowledged.

I have tried:

@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
    super.postProcessDispatch(messageDispatch);
    String topic =  messageDispatch.getDestination().getPhysicalName();
    if( topic == null || topic.equals("delivered") )
        return;

    try {
        ActiveMQTopic responseTopic = new ActiveMQTopic("delivered");
        ActiveMQTextMessage responseMsg = new ActiveMQTextMessage();
        responseMsg.setPersistent(false);
        responseMsg.setResponseRequired(false);
        responseMsg.setProducerId(new ProducerId());
        responseMsg.setText("Delivered msg: "+msg);
        responseMsg.setDestination(responseTopic);
        String messageKey = ":"+rand.nextLong();
        MessageId msgId = new MessageId(messageKey);
        responseMsg.setMessageId(msgId);

        ProducerBrokerExchange producerExchange=new ProducerBrokerExchange();
        ConnectionContext context = getAdminConnectionContext();
        producerExchange.setConnectionContext(context);
        producerExchange.setMutable(true);
        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
        next.send(producerExchange, responseMsg); 
    } 
    catch (Exception e) {
        log.debug("Exception: "+e);
    }

However the above seems to lead to unstable server. I'm thinking this is related to using the getAdminConnectionContext which seems wrong.

1

There are 1 best solutions below

0
On

My factory was setting setPubSubDomain to false by default. This disables connections for advisory messages for topics. I set it to true and things started working. Note that queues will not work with this set. To get around that I created two factories and named their beans.

    @Bean(name="main")
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory =  new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
//        factory.setDestinationResolver(destinationResolver);
//        factory.setPubSubDomain(true);
        factory.setConcurrency("3-10");
        return factory;
    }