I want to poll and process SQS messages with high throughput. I have a number of threads, each with a SQS poller in a Spring @Async
. At the moment the code is simply printing the message. My hope is that I can adjust the threadcount and hopefully get throughput increases.
When I scale the number of parallel tasks I see more concurrent pollers. But I don't see an increase in throughput as I scale from 1 to 10. I see it maxing out at about 30 messages per second, which seems very low.
I have added a connection pool to the SQS client in case it's starving the connection pool.
Am I doing something obviously wrong in the way I use the SDK?
This code isn't well structured, but it's enough to demonstrate my issue.
@Component
class Poller
(val client : SqsClient)
{
private val logger: Logger = LoggerFactory.getLogger(this::class.java)
@Async
fun runInBackground(pollerId: Int, client: SqsClient) {
val queueName = "MY-QUEUE"
val queueUrl = client.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).queueUrl()
val request = ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(10)
.waitTimeSeconds(1).build()
while (true) {
val messages = client.receiveMessage(request).messages()
for (m in messages) {
logger.info("MESSAGE T$pollerId : ${m.messageId()}")
}
for (m in messages) {
client.deleteMessage(
DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(m.receiptHandle()).build()
)
}
}
}
}
@Component
class Poller(
val runner : MyBackgroundService
) {
private val logger: Logger = LoggerFactory.getLogger(this::class.java)
@PostConstruct
fun startBackgroundService() {
val client = SqsClient.builder()
.defaultsMode(DefaultsMode.AUTO)
.httpClient(ApacheHttpClient.builder().maxConnections(200).build())
.build()
repeat(10) {
runner.runInBackground(it, client)
}
}
}