Currently I am running 2 larger servers within AWS which besides other things handle SQS messages from the queue. The CPU usage of those servers is not that high (9% at peak) but I do have some latency on processing my sqs messages (70 ms+) when there are 2/3k messages in the queue. So was wondering if you guys may have some tips on improving the following kotlin code with RxJava 3 to retrieve and process the messages:
val sqs = AmazonSQSClientBuilder
.standard()
.withRegion(Regions.EU_WEST_1)
.withCredentials(DefaultAWSCredentialsProviderChain())
.build()
val queueUrl = sqs.getQueueUrl(Configuration.getSqsNotificationConfig().queueName).queueUrl
LOGGER.info("Connecting to queue $queueUrl")
val request = ReceiveMessageRequest(queueUrl).apply {
maxNumberOfMessages = 8
waitTimeSeconds = 20
}
val receiver = RxSqsReceiver(sqs, request)
val notificationPoller = RxSqsPoller(receiver, sqsItineraryNotificationMessageHandler)
notificationPoller.startSqsPolling(Schedulers.io())
class RxSqsReceiver(
private val sqs: AmazonSQS,
private val receiveMessageRequest: ReceiveMessageRequest
) {
fun receive(): Observable<List<Message>> {
return Observable.fromCallable {
LOGGER.debug("Start long-polling for messages, url=${receiveMessageRequest.queueUrl}")
val result = sqs.receiveMessage(receiveMessageRequest)
result.messages
}
}
fun deleteMessage(message: Message): Observable<Message> {
return Observable.fromCallable {
sqs.deleteMessage(receiveMessageRequest.queueUrl, message.receiptHandle)
message
}
.doOnError { t: Throwable -> LOGGER.debug(t.message) }
.onErrorResumeNext { Observable.empty() }
}
companion object {
private val LOGGER = LoggerFactory.getLogger(RxSqsReceiver::class.java)
}
}
class RxSqsPoller(
private val sqsReceiver: RxSqsReceiver,
private val sqsMessageHandler: RxSqsMessageHandler
) {
private val compositeDisposable = CompositeDisposable()
fun startSqsPolling(scheduler: Scheduler, repeat: Long = Long.MAX_VALUE) {
compositeDisposable.add(sqsReceiver
.receive()
.subscribeOn(scheduler)
.concatMap { messages -> handleMessages(messages, scheduler) }
.doOnError { throwable -> LOGGER.error(throwable.message) }
.repeat(repeat)
.retry(1)
.subscribe())
}
private fun handleMessages(messages: List<Message>, scheduler: Scheduler): Observable<Message> {
return Observable.fromIterable(messages)
.flatMap { message -> getHandleMessageObservable(message, scheduler) }
.flatMap { message -> sqsReceiver.deleteMessage(message) }
.doOnError { throwable -> LOGGER.warn(throwable.message, throwable) }
.onErrorResumeNext { Observable.empty() }
}
private fun getHandleMessageObservable(message: Message, scheduler: Scheduler): Observable<Message> {
LOGGER.debug("Received message: ${message.body}")
return sqsMessageHandler.handleMessage(message).subscribeOn(scheduler)
}
fun shutDown() {
compositeDisposable.clear()
}
companion object {
private val LOGGER = LoggerFactory.getLogger(RxSqsPoller::class.java)
}
}
So what could I maybe do different/ better to improve the amount of messages my code can process?