Retrieving sqs messages with rxjava 3 in kotlin in a better way

61 Views Asked by At

Currently I am running 2 larger servers within AWS which besides other things handle SQS messages from the queue. The CPU usage of those servers is not that high (9% at peak) but I do have some latency on processing my sqs messages (70 ms+) when there are 2/3k messages in the queue. So was wondering if you guys may have some tips on improving the following kotlin code with RxJava 3 to retrieve and process the messages:

val sqs = AmazonSQSClientBuilder
    .standard()
    .withRegion(Regions.EU_WEST_1)
    .withCredentials(DefaultAWSCredentialsProviderChain())
    .build()

val queueUrl = sqs.getQueueUrl(Configuration.getSqsNotificationConfig().queueName).queueUrl
LOGGER.info("Connecting to queue $queueUrl")
val request = ReceiveMessageRequest(queueUrl).apply {
    maxNumberOfMessages = 8
    waitTimeSeconds = 20
}
val receiver = RxSqsReceiver(sqs, request)

val notificationPoller = RxSqsPoller(receiver, sqsItineraryNotificationMessageHandler)
notificationPoller.startSqsPolling(Schedulers.io())

class RxSqsReceiver(
    private val sqs: AmazonSQS,
    private val receiveMessageRequest: ReceiveMessageRequest
) {
    fun receive(): Observable<List<Message>> {
        return Observable.fromCallable {
            LOGGER.debug("Start long-polling for messages, url=${receiveMessageRequest.queueUrl}")
            val result = sqs.receiveMessage(receiveMessageRequest)
            result.messages
        }
    }

    fun deleteMessage(message: Message): Observable<Message> {
        return Observable.fromCallable {
            sqs.deleteMessage(receiveMessageRequest.queueUrl, message.receiptHandle)
            message
        }
            .doOnError { t: Throwable -> LOGGER.debug(t.message) }
            .onErrorResumeNext { Observable.empty() }
    }

    companion object {
        private val LOGGER = LoggerFactory.getLogger(RxSqsReceiver::class.java)
    }
}

class RxSqsPoller(
    private val sqsReceiver: RxSqsReceiver,
    private val sqsMessageHandler: RxSqsMessageHandler
) {
    private val compositeDisposable = CompositeDisposable()

    fun startSqsPolling(scheduler: Scheduler, repeat: Long = Long.MAX_VALUE) {
        compositeDisposable.add(sqsReceiver
            .receive()
            .subscribeOn(scheduler)
            .concatMap { messages -> handleMessages(messages, scheduler) }
            .doOnError { throwable -> LOGGER.error(throwable.message) }
            .repeat(repeat)
            .retry(1)
            .subscribe())
    }

    private fun handleMessages(messages: List<Message>, scheduler: Scheduler): Observable<Message> {
        return Observable.fromIterable(messages)
            .flatMap { message -> getHandleMessageObservable(message, scheduler) }
            .flatMap { message -> sqsReceiver.deleteMessage(message) }
            .doOnError { throwable -> LOGGER.warn(throwable.message, throwable) }
            .onErrorResumeNext { Observable.empty() }
    }

    private fun getHandleMessageObservable(message: Message, scheduler: Scheduler): Observable<Message> {
        LOGGER.debug("Received message: ${message.body}")
        return sqsMessageHandler.handleMessage(message).subscribeOn(scheduler)
    }

    fun shutDown() {
        compositeDisposable.clear()
    }

    companion object {
        private val LOGGER = LoggerFactory.getLogger(RxSqsPoller::class.java)
    }
}

So what could I maybe do different/ better to improve the amount of messages my code can process?

0

There are 0 best solutions below