I'm new to Amazon SQS, but I need to implement code that reads many queues at the same time (over 1000 queues, created dynamically).
I'm using NET 7, and for that, I created a class that reads the queue, using the ReceiveMessageAsync function and triggering an event with the obtained message. This code is used by several tasks, after getting the SQS queues with the following code:
public class SQSQueueConsumer : IQueueConsumer
{
public const int MIN_TIMEOUT = 0;
public const int MAX_TIMEOUT = 20;
private const string QUEUE_NOT_EXISTS_CODE = "AWS.SimpleQueueService.NonExistentQueue";
private readonly IAmazonSQS _sqs;
private int _messageTimeout = MAX_TIMEOUT;
public event EventHandler<MessageReceivedEventArgs> MessageReceived;
public event EventHandler<QueueErrorEventArgs> ErrorOccurred;
public SQSQueueConsumer(IAmazonSQS sqs)
{
ArgumentNullException.ThrowIfNull(sqs);
_sqs = sqs;
}
public void SetMessageTimeout(int timeout)
{
if(timeout < MIN_TIMEOUT || timeout > MAX_TIMEOUT)
{
throw new ArgumentException($"Timeout should be between {MIN_TIMEOUT} and {MAX_TIMEOUT}.");
}
_messageTimeout = timeout;
}
public async Task ConsumeAsync(string queueUrl, CancellationToken ct)
{
var receivedRequest = new ReceiveMessageRequest()
{
QueueUrl = queueUrl,
MaxNumberOfMessages = 10,
WaitTimeSeconds = _messageTimeout
};
while(!ct.IsCancellationRequested)
{
try
{
var messageResponse = await _sqs.ReceiveMessageAsync(receivedRequest, ct);
if (messageResponse.HttpStatusCode != HttpStatusCode.OK)
{
OnError(new QueueErrorEventArgs(queueUrl, "HTTP Error."));
continue;
}
foreach (var message in messageResponse.Messages)
{
OnMessageReceived(new MessageReceivedEventArgs(queueUrl, message.Body));
await _sqs.DeleteMessageAsync(queueUrl, message.ReceiptHandle, ct);
}
}
catch(AmazonSQSException ex)
{
if (ex.ErrorCode == QUEUE_NOT_EXISTS_CODE)
{
throw new Exceptions.QueueDoesNotExistException(queueUrl);
}
}
}
}
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> MessageReceived?.Invoke(this, e);
protected virtual void OnError(QueueErrorEventArgs e)
=> ErrorOccurred?.Invoke(this, e);
}
My problem is that if I have, for example, 1500 queues being consumed, the Amazon object throws several exceptions like System.Net.Http.HttpRequestException and System.Net.Sockets.SocketException. With fewer queues, the software works correctly.
Could this error be linked to the http handler socket exhaustion?
I also tried to create a very simple custom HTTP handler and got the same error.
public class SqsHttpClientFactory : Amazon.Runtime.HttpClientFactory
{
private readonly IHttpClientFactory _httpClientFactory;
public SqsHttpClientFactory(IHttpClientFactory httpClientFactory)
{
_httpClientFactory = httpClientFactory;
}
public override HttpClient CreateHttpClient(IClientConfig clientConfig)
{
return _httpClientFactory.CreateClient();
}
}
And using like that:
services.AddSingleton<IAmazonSQS>(s =>
{
var settings = s.GetRequiredService<IOptions<SQSSettings>>().Value;
AWSOptions awsOptions = new AWSOptions
{
Credentials = new Amazon.Runtime.BasicAWSCredentials(settings.AccessKeyId, settings.SecretAccessKey),
Region = RegionEndpoint.GetBySystemName(settings.Region),
};
var httpFactory = s.GetRequiredService<IHttpClientFactory>();
var config = new AmazonSQSConfig()
{
RegionEndpoint = awsOptions.Region,
HttpClientFactory = new SqsHttpClientFactory(httpFactory),
};
var client = new AmazonSQSClient(awsOptions.Credentials, config);
return client;
});
Thanks for all the help.
I performed the tests with about 1000 queues and it worked correctly without exceptions, but with 1500 queues or more I always get exceptions, and queue consumption is extremely slow.