Handling PENDING messages from Redis Stream with Spring Data Redis

1.1k Views Asked by At

When using StreamMessageListenerContainer a subscription for a consumer group can be created by calling:

receive(consumer, readOffset, streamListener)

Is there a way to configure the container/subscription so that it will always attempt to re-process any PENDING messages before moving on to polling for new messages?

The goal would be to keep retrying any message that wasn't acknowledged until it succeeds, to ensure that the stream of events is always processed in exactly the order it was produced.

My understanding is if we specify the readOffset as '>' then on every poll it will use '>' and it will never see any messages from the PENDING list.

If we provide a specific message id, then it can see messages from the PENDING list, but the way the subscription updates the lastMessageId is like this:

pollState.updateReadOffset(raw.getId().getValue());
V record = convertRecord(raw);
listener.onMessage(record);

So even if the listener throws an exception, or just doesn't acknowledge the message id, the lastMessageId in pollState is still updated to this message id and won't be seen again on the next poll.

0

There are 0 best solutions below