We have an application that uses the Spring Cloud Azure Stream Binder to consume messages from the Azure Bus.
Our consumer looks like the one in the official documentation:
import com.azure.spring.messaging.checkpoint.Checkpointer;
[...]
import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
@SpringBootApplication
public class ServiceBusApplication {
[...]
@Bean
public Consumer<Message<String>> consume() {
return message -> {
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
.doOnError(e -> LOGGER.error("Error found", e))
.block();
};
}
}
We need to programmatically renew the message lock if the consumer needs more time to process the message.
This can be done using the API com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.renewMessageLock()
But I was unable to grab a reference to the ServiceBusReceiverAsyncClient object. Is this possible to do this operation using the Spring binder without reimplementing the code to us the azure java sdk directly?
My understanding of the behaviour is that the lock is automatically renewed up to
max-auto-lock-renew-duration
if the message is not consumed until then (or consuming finishes with exception). The lock is initially held for 1 minute, then it is renewed.Although this is not well documented in the spring cloud doc, this article may help: https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement