Error trying to connect Celery through SQS using STS

1.9k Views Asked by At

I'm trying to use Celery with SQS as broker. In order to use the SQS from my container I need to assume a role and for that I'm using STS. My code looks like this:

role_info = {
    'RoleArn': 'arn:aws:iam::xxxxxxx:role/my-role-execution',
    'RoleSessionName': 'roleExecution'
}
sts_client = boto3.client('sts', region_name='eu-central-1')
credentials = sts_client.assume_role(**role_info)

aws_access_key_id = credentials["Credentials"]['AccessKeyId']
aws_secret_access_key = credentials["Credentials"]['SecretAccessKey']
aws_session_token = credentials["Credentials"]["SessionToken"]

os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
os.environ["AWS_DEFAULT_REGION"] = 'eu-central-1'
os.environ["AWS_SESSION_TOKEN"] = aws_session_token

broker = "sqs://"
backend = 'redis://redis-service:6379/0'

celery = Celery('tasks', broker=broker, backend=backend)

celery.conf["task_default_queue"] = 'my-queue'
celery.conf["broker_transport_options"] = {
    'region': 'eu-central-1',
    'predefined_queues': {
       'my-queue': {
            'url': 'https://sqs.eu-central-1.amazonaws.com/xxxxxxx/my-queue'
        }
    }
}

In the same file I have the following task:

@celery.task(name='my-queue.my_task')
def my_task(content) -> int:
    print("hello")
    return 0

When I execute the following code I get an error:

[2020-09-24 10:38:03,602: CRITICAL/MainProcess] Unrecoverable error: ClientError('An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 921, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 208, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/connection.py", line 23, in start
    c.connection = c.connect()
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 405, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 412, in connection_for_read
    self.app.connection_for_read(heartbeat=heartbeat))
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 439, in ensure_connected
    callback=maybe_shutdown,
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 422, in ensure_connection
    callback, timeout=timeout)
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/functional.py", line 341, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 275, in connect
    return self.connection
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 823, in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 778, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 941, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 923, in create_channel
    channel = self.Channel(connection)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/SQS.py", line 100, in __init__
    self._update_queue_cache(self.queue_name_prefix)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/SQS.py", line 105, in _update_queue_cache
    resp = self.sqs.list_queues(QueueNamePrefix=queue_name_prefix)
  File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 337, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 656, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the ListQueues operation: Access to the resource https://eu-central-1.queue.amazonaws.com/ is denied.

If I use boto3 directly without Celery, I'm able to connect to the queue and retrieve data without this error. I don't know why Celery/Kombu try to list queues when I specify the predefined_queues configuration, tha is used to avoid these behavior (from docs):

If you want Celery to use a set of predefined queues in AWS, and to never attempt to list SQS queues, nor attempt to create or delete them, pass a map of queue names to URLs using the predefined_queue_urls setting

Source here

Anyone know what happens? How I should modify my code in order to make it work?. Seems that Celery is not using the credentials at all.

The versions I'm using:

celery==4.4.7
boto3==1.14.54
kombu==4.5.0

Thanks!

PS: I created and issue in Github to track if this can be a library error or not...

2

There are 2 best solutions below

0
On

I solved the problem updating dependencies to the latest versions:

celery==5.0.0
boto3==1.14.54
kombu==5.0.2
pycurl==7.43.0.6
0
On

I was able to get celery==4.4.7 and kombu==4.6.11 working by setting the following configuration option:

celery.conf["task_create_missing_queues"] = False