ACTIVEMQ + NMS cannot synchronously receive

431 Views Asked by At

I'm trying to use activeMQ with an NMS (C#) consumer to get messages, do some processing and then send the contents to a webserivce via HttpClient.PostAsync(), all running within a windows service (via Topshelf).

The downstream system I'm communicating with is extremely touchy and I'm using individual acknowledgement so that I can check the response and act accordingly by acknowledging or triggering a custom retry (i.e. not session.recover).

Since the downstream system is unreliable, I've been trying a few different ways to reduce the throughput of my consumer. I thought I'd be able to accomplish this by converting to be synchronous and using prefetch, but it doesn't appear to have worked.

My understanding is that with an async consumer the prefetch 'limit' will never be hit but using synchronous method the prefetch queue will only be eaten away as messages are acknowledged, meaning that I can tune my listener to pass messages at a rate which the downstream component can handle.

With a queue loaded with 100 messages, and kick off my code using a listener (i.e. asynchronously) then I can successfully log that 100 msgs have been through. When I change it to use consumer.Receive() (or ReceiveNoWait) then I never get a message.

Here is a snippet of what I'm trying for the synchronous consumer, with the async option included but commented out:

    public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
    {
        log = logger;
        configuration = config;
        this.endpointClient = endpointClient;

        connection = connectionFactory.CreateConnection();
        connection.RedeliveryPolicy = GetRedeliveryPolicy();
        connection.ExceptionListener += new ExceptionListener(OnException);
        session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
        queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
        consumer = session.CreateConsumer(queue);

        // Asynchronous
        //consumer.Listener += new MessageListener(OnMessage);

        // Synchronous
        var message = consumer.Receive(TimeSpan.FromSeconds(5));
        while (true)
        {
            if (!Equals(message, null))
            {
                OnMessage(message);
            }
        }
    }

    public void OnMessage(IMessage message)
    {
        log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
        message.Acknowledge();
    }
1

There are 1 best solutions below

0
On BEST ANSWER

I believe you need to call Start() on your connection, e.g.:

connection.Start();

Calling Start() indicates that you want messages to flow.

It's also worth noting that there's no way to break out of your while(true) loop aside from throwing an exception from OnMessage.