Apache Pulsar - ACK exception

51 Views Asked by At

We have pulsar running for several weeks. A new persistent queue without partitions has been added.

Suddenly we are seeing exceptions as below. Only the new subscription is showing this behaviour. None of the other queues. All consumers are implemented in the same way as shown below. This is happening when the message is being acknowledged:

consumer.acknowledgeAsync(msg);

Exception:

ava.lang.IllegalStateException: recycled already
at io.netty.util.Recycler$WeakOrderQueue.transfer(Recycler.java:442)
at io.netty.util.Recycler$Stack.scavengeSome(Recycler.java:587)
at io.netty.util.Recycler$Stack.scavenge(Recycler.java:562)
at io.netty.util.Recycler$Stack.pop(Recycler.java:535)
at io.netty.util.Recycler.get(Recycler.java:162)
at org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable.create(ConcurrentBitSetRecyclable.java:51)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.lambda$doIndividualBatchAckAsync$8(PersistentAcknowledgmentsGroupingTracker.java:328)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualBatchAckAsync(PersistentAcknowledgmentsGroupingTracker.java:323)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualBatchAck(PersistentAcknowledgmentsGroupingTracker.java:294)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.doIndividualBatchAck(PersistentAcknowledgmentsGroupingTracker.java:287)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.lambda$addAcknowledgment$4(PersistentAcknowledgmentsGroupingTracker.java:235)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addIndividualAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:220)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:232)
at org.apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker.addAcknowledgment(PersistentAcknowledgmentsGroupingTracker.java:196)
at org.apache.pulsar.client.impl.ConsumerImpl.doAcknowledge(ConsumerImpl.java:560)
at org.apache.pulsar.client.impl.ConsumerBase.doAcknowledgeWithTxn(ConsumerBase.java:688)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:634)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:619)
at org.apache.pulsar.client.impl.ConsumerBase.acknowledgeAsync(ConsumerBase.java:521)
at de.seepex.service.lastactive.PulsarLastActiveConsumer.messageListener(PulsarLastActiveConsumer.java:82)
at org.apache.pulsar.client.impl.ConsumerBase.callMessageListener(ConsumerBase.java:1157)

Complete consumer method:

private void messageListener(Consumer<byte[]> consumer, Message<byte[]> msg) {
    try {
        byte[] payload = msg.getData();

        if (payload == null || payload.length == 0) {
            consumer.acknowledgeAsync(msg);
            return;
        }

        final LastActiveUpdate lastActiveUpdate = objectMapper.readValue(payload, LastActiveUpdate.class);
        lastActiveProcessor.process(lastActiveUpdate);


        consumer.acknowledgeAsync(msg);


    } catch (Exception e) {
        LOG.error("failed to process", e);
        consumer.negativeAcknowledge(msg);
    }
}

Any clues what might be causing the error?

1

There are 1 best solutions below

0
Alex Tbk On

It turned out to be a wrong netty version that was causing this bug:

https://github.com/apache/pulsar/discussions/21929