ActiveMQ + NMS. Prefetch limit not respected

831 Views Asked by At

I have a synchronous consumer connecting to a queue via this URL which also sets the Prefetch limit:

tcp://localhost:61616?nms.prefetchPolicy.all=5

I check the limit programmatically and output it to my logs, so I know the syntax is right and it's being set.

My understanding of the prefetch is that a synchronous consumer will be sent x messages and then once it has acknowledged 50% of them, it'll be sent another batch. This is not the behaviour that I am seeing.

  • I have set prefetch to 5 in the URL
  • I have pre-populated a queue with 20 messages
  • I log every message that comes in
  • I have commented out the message.Acknowledge(); line in my code

So I think that I should see 5 messages appear in my log and no more, because I don't acknowledge any of them. However, I'm seeing all 20 messages appear in the logs.

Have I misunderstood the way prefetch should work, or is there a bug in my code?

Code (c#, .Net Core):

public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
    log = logger;
    configuration = config;
    this.endpointClient = endpointClient;

    connection = connectionFactory.CreateConnection();
    connection.RedeliveryPolicy = GetRedeliveryPolicy();
    connection.ExceptionListener += new ExceptionListener(OnException);
    connection.Start();
    session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
    queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
    consumer = session.CreateConsumer(queue);
    log.InfoFormat("Prefetch set to:{0}. Maximum configurable:{1}", ((Connection)connection).PrefetchPolicy.QueuePrefetch, PrefetchPolicy.MAX_PREFETCH_SIZE);

    while (true)
    {
        var message = consumer.Receive(TimeSpan.FromSeconds(5));
        if (!Equals(message, null))
        {
            OnMessage(message);
        }
    }
}

public void OnMessage(IMessage message)
{
    log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
    //message.Acknowledge();
}

Logs:

[18:07:37 INF] Prefetch set to:5. Maximum configurable:32766
[18:07:38 DBG] Message 0 Received. Attempt:2
[18:07:38 DBG] Message 1 Received. Attempt:2
[18:07:38 DBG] Message 2 Received. Attempt:2
[18:07:38 DBG] Message 3 Received. Attempt:2
[18:07:40 DBG] Message 4 Received. Attempt:2
[18:07:40 DBG] Message 5 Received. Attempt:2
[18:07:40 DBG] Message 6 Received. Attempt:2
[18:07:40 DBG] Message 7 Received. Attempt:2
[18:07:40 DBG] Message 8 Received. Attempt:2
[18:07:40 DBG] Message 9 Received. Attempt:2
[18:07:40 DBG] Message 10 Received. Attempt:2
[18:07:40 DBG] Message 11 Received. Attempt:2
[18:07:40 DBG] Message 12 Received. Attempt:2
[18:07:40 DBG] Message 13 Received. Attempt:2
[18:07:40 DBG] Message 14 Received. Attempt:2
[18:07:40 DBG] Message 15 Received. Attempt:2
[18:07:41 DBG] Message 16 Received. Attempt:2
[18:07:41 DBG] Message 17 Received. Attempt:2
[18:07:41 DBG] Message 18 Received. Attempt:2
[18:07:41 DBG] Message 19 Received. Attempt:2

Thanks in advance :)

P.S. I'm using:

  • ActiveMQ v5.15.12
  • Apache.NMS.ActiveMQ.NetCore v1.7.2 (NuGet)
2

There are 2 best solutions below

1
On BEST ANSWER

The prefetch value doesn't mean that you can control queue dispatch it just controls how many will be buffered waiting to deliver to you if your application isn't actually doing any receive processing or is consuming slowly. This allows the broker to provide some messages to the client and then move on to offering messages to other clients that arrive while the first is processing its prefetch.

Once you start consuming messages regardless of whether you acknowledge them the client will extend the prefetch credit to allow there to always be a backlog the size of the prefetch amount. This is because as a consumer you may want to read many messages in large batches and acknowledge them all at once which is what client acknowledge does as does transactional consumption. The NMS client tops up prefetch credit at 70% dispatched if I recall correctly not 50% but regardless this happens as soon as you've pulled that number of messages from the prefetch buffer.

If you want control of messages dispatched from the broker then you'd need to use a zero prefetch setting and a synchronous receiving consumer to ensure that only one message can be inflight to the client at any given time.

0
On

Disable usePrefetchExtension in broker configuration. By default broker will dispatch message without an ack response.

According to the documentation:

The prefetch extension is used when a message is delivered but not ACK’ed, such that the broker can dispatch another message, e.g., prefetch == 0, the idea being that there will always be prefetch number of messages pending. It also allows a transaction batch to exceed the prefetch value.