Java SQS client throughput doesn't scale up in parallel

86 Views Asked by At

I want to poll and process SQS messages with high throughput. I have a number of threads, each with a SQS poller in a Spring @Async. At the moment the code is simply printing the message. My hope is that I can adjust the threadcount and hopefully get throughput increases.

When I scale the number of parallel tasks I see more concurrent pollers. But I don't see an increase in throughput as I scale from 1 to 10. I see it maxing out at about 30 messages per second, which seems very low.

I have added a connection pool to the SQS client in case it's starving the connection pool.

Am I doing something obviously wrong in the way I use the SDK?

This code isn't well structured, but it's enough to demonstrate my issue.

@Component
class Poller
    (val client : SqsClient)
{
    private val logger: Logger = LoggerFactory.getLogger(this::class.java)

    @Async
    fun runInBackground(pollerId: Int, client: SqsClient) {
        val queueName = "MY-QUEUE"

        val queueUrl = client.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).queueUrl()

        val request = ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(10)
            .waitTimeSeconds(1).build()

        while (true) {
            val messages = client.receiveMessage(request).messages()

            for (m in messages) {
                logger.info("MESSAGE T$pollerId :  ${m.messageId()}")
            }

            for (m in messages) {
                client.deleteMessage(
                    DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(m.receiptHandle()).build()
                )
            }
        }
    }
}


@Component
class Poller(
    val runner : MyBackgroundService
) {

    private val logger: Logger = LoggerFactory.getLogger(this::class.java)

    @PostConstruct
    fun startBackgroundService() {
        val client = SqsClient.builder()
            .defaultsMode(DefaultsMode.AUTO)
            .httpClient(ApacheHttpClient.builder().maxConnections(200).build())
            .build()

        repeat(10) {
            runner.runInBackground(it, client)
        }
    }
}
0

There are 0 best solutions below