Python Version: 3.7 Celery Version: 5.2
below is the code being used
# app.py (Flask application)
from flask import Flask, jsonify
from celery import Celery
import db_connectors
import os
from urllib import parse
app = Flask(__name__)
mongo_host = os.environ.get("MONGO_HOST", "")
mongo_port = os.environ.get("MONGO_PORT", "")
mongo_db = ""
mongo_replica = os.environ.get("MONGO_USE_REPLICA", "")
mongo_replicaset = os.environ.get("MONGO_REPLICASET", "")
mongo_user = os.environ.get("MONGO_USER", "")
mongo_password = os.environ.get("MONGO_PASS", "")
doc_db_ssl = os.environ.get("MONGO_DOCUMENTDB_SSL", "")
doc_db_enabled = os.environ.get("DOCUMENTDB_ENABLE", "")
# Configure Flask app to use Celery
connection_string = mongo_url
app.config['CELERY_BROKER_URL'] = connection_string
app.config['CELERY_RESULT_BACKEND'] = connection_string
# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
CELERY_BROKER_TRANSPORT_OPTIONS = {
"region": "us-east-1",
'visibility_timeout': 360,
'polling_interval': 1
}
# MongoDB configuration
mongo_client = db_connectors.get_mongo_connection()
# Celery task to fetch data from MongoDB
@celery.task
def fetch_data_from_mongo():
data = mongo_client.connection.someDb.someTable.find({
'find_key': '6523713'
}) # Modify this query as needed
result = [entry for entry in data]
return result
@app.route('/fetch_data', methods=['GET'])
def fetch_data():
task = fetch_data_from_mongo.delay()
return jsonify({'task_id': str(task)})
@app.route('/fetch_data_without_delay', methods=['GET'])
def fetch_data_without_delay():
task = fetch_data_from_mongo()
return jsonify({'task_id': str(task)})
if __name__ == '__main__':
app.run(debug=False, port=5000)
There are 2 api cals "/fetch_data_without_delay" and "/fetch_data" which essentially call the same method to fetch data from mongo db.
the "/fetch_data_without_delay" method works fine and even fetches the data from mongo. but, the "/fetch_data" which uses the .delay() method throws the below error as it fails to connect to mongodb to form the queue
please find the error below:
[2023-11-27 14:56:13,684: ERROR/MainProcess] consumer: Cannot connect to mongodb://user_here:**@rhost_here:port_here/db_here: Field 'size' is currently not supported.
Trying again in 32.00 seconds... (16/100)
[2023-11-27 14:56:45,838: CRITICAL/MainProcess] Unrecoverable error: OperationalError("Field 'size' is currently not supported")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 925, in create_channel
return self._avail_channels.pop()
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 70, in __get__
return obj_dict[name]
KeyError: 'client'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 446, in _reraise_as_library_errors
yield
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 437, in _ensure_connection
callback, timeout=timeout
File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 312, in retry_over_time
return fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 877, in _connection_factory
self._connection = self._establish_connection()
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 812, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 949, in establish_connection
self._avail_channels.append(self.create_channel(self))
File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 927, in create_channel
channel = self.Channel(connection)
File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 146, in __init__
self.client
File "/usr/local/lib/python3.7/site-packages/kombu/utils/objects.py", line 30, in __get__
return super().__get__(instance, owner)
File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 74, in __get__
return obj_dict.setdefault(name, self.func(obj))
File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 385, in client
return self._create_client()
File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 378, in _create_client
self._create_broadcast(database)
File "/usr/local/lib/python3.7/site-packages/kombu/transport/mongodb.py", line 354, in _create_broadcast
capped=True)
File "/usr/local/lib/python3.7/site-packages/pymongo/database.py", line 369, in create_collection
read_concern, session=s, **kwargs)
File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 180, in __init__
self.__create(kwargs, collation, session)
File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 258, in __create
collation=collation, session=session)
File "/usr/local/lib/python3.7/site-packages/pymongo/collection.py", line 244, in _command
retryable_write=retryable_write)
File "/usr/local/lib/python3.7/site-packages/pymongo/pool.py", line 579, in command
unacknowledged=unacknowledged)
File "/usr/local/lib/python3.7/site-packages/pymongo/network.py", line 150, in command
parse_write_concern_error=parse_write_concern_error)
File "/usr/local/lib/python3.7/site-packages/pymongo/helpers.py", line 155, in _check_command_response
raise OperationFailure(msg % errmsg, code, response)
pymongo.errors.OperationFailure: Field 'size' is currently not supported
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 326, in start
blueprint.start(self)
File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/connection.py", line 21, in start
c.connection = c.connect()
File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 422, in connect
conn = self.connection_for_read(heartbeat=self.amqheartbeat)
File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 429, in connection_for_read
self.app.connection_for_read(heartbeat=heartbeat))
File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 456, in ensure_connected
callback=maybe_shutdown,
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 381, in ensure_connection
self._ensure_connection(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 437, in _ensure_connection
callback, timeout=timeout
File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: Field 'size' is currently not supported
Please let me know in figuring out why the .delay() method is failing