I know that similar questions have already been asked here, but none of the answers I found turned out to be suitable. I hope I get lucky now.
So, I have a Django application using Postgres, RabbitMQ and Celery. When I run the components individually, everything works fine. The problem occurs when running through docker-compose.
For some reason, celery tasks cannot find the object in the database. The object exists, moreover, it stores the celery task id (for example, for possible subsequent cancellation of the task).
Here are the parts of the code relevant to the problem:
# docker-compose.yml
version: "3.7"
services:
db:
image: postgres:12.4
container_name: "fbrq_db"
volumes:
- postgres_data:/var/lib/postgresql/data/
env_file:
- .env
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
networks:
- default
rabbitmq:
restart: always
container_name: "fbrq_rabbit"
image: rabbitmq:3-management-alpine
ports:
- 5672:5672
- 15672:15672
networks:
- default
app:
restart: always
container_name: "fbrq_app"
build: .
volumes:
- .:/code
- ./static:/code/static
command: gunicorn --bind 0.0.0.0:8000 fbrq_api.wsgi
ports:
- "8000:8000"
networks:
- default
celery:
restart: always
container_name: "fbrq_celery"
build: .
command: celery -A fbrq_api worker -l info
env_file:
- ./.env
depends_on:
- app
- rabbitmq
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
- DJANGO_SETTINGS_MODULE=fbrq_api.settings
networks:
- default
celery-beat:
restart: always
container_name: "fbrq_celery-beat"
build: .
command: celery -A fbrq_api beat -l info
env_file:
- ./.env
depends_on:
- app
- rabbitmq
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
networks:
- default
volumes:
postgres_data:
# tasks.py
class MessageTaskManager:
@staticmethod
def create_message_celery_task(message):
message_send_time = get_message_send_time(
message.mailing, message.client
)
task = send_mailing.apply_async(
args=[message.id], eta=message_send_time
)
celery_task_id = task.id
message.celery_task_id = celery_task_id
message.save()
@shared_task(bind=True, acks_late=True, name="Mailing")
def send_mailing(self, message_id):
try:
message = Message.objects.get(id=message_id)
message.status = Message.Status.NOT_DELIVERED
try:
api_data = {
"id": message_id,
"phone": message.client.phone,
"text": message.mailing.text,
}
headers = {
"Authorization": api_token,
}
response = requests.post(
api_url + str(message_id), json=api_data, headers=headers
)
response.raise_for_status()
message.status = Message.Status.DELIVERED
except (Timeout, HTTPError, RequestException) as e:
MessageTaskManager.create_new_message_and_send(message)
except Exception as e:
MessageTaskManager.create_new_message_and_send(message)
finally:
message.save()
except Message.DoesNotExist as e:
logger.error(f"Message_{message_id} doesn't exist yet")
self.retry(exc=e, countdown=10)
# views.py
def get_mailing_messages(mailing, created=False):
if created:
clients = [
client
for client in mailing.get_filtered_clients()
if get_message_send_time(mailing, client)
]
mailing.set_clients_count(clients)
return list(
Message.objects.create(mailing=mailing, client=client)
for client in clients
)
return [message for message in Message.objects.filter(mailing=mailing)]
class MailingViewSet(viewsets.ModelViewSet):
queryset = Mailing.objects.all()
serializer_class = MailingSerializer
def create(self, request, *args, **kwargs):
logger.info(f"Request from API to create mailing: {request.data}")
serializer = self.get_serializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
mailing_id = serializer.data["id"]
mailing = Mailing.objects.get(id=mailing_id)
messages = get_mailing_messages(mailing, created=True)
for message in messages:
transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))
# message_task_manager.create_message_celery_task(message)
return Response(
serializer.data,
status=status.HTTP_201_CREATED,
headers=headers,
)
except serializers.ValidationError as e:
logger.error(f"Failed to validate data. Errors: {e.detail}")
return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)
You can see that initially I just called the celery task creation method:
for message in messages:
message_task_manager.create_message_celery_task(message)
But after failure and a little study of the problem, I changed this part of the code:
for message in messages:
transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))
The messages I receive look like this:
[2024-02-10 08:34:02,930: ERROR/ForkPoolWorker-8] Message_1207 doesn't exist yet
[2024-02-10 08:34:02,932: INFO/MainProcess] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] received
[2024-02-10 08:34:02,935: INFO/ForkPoolWorker-8] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] retry: Retry in 10s: DoesNotExist('Message matching query does not exist.')
But, in the other hand:
>>> from api.models import Message
>>> msg = Message.objects.get(id=1207)
>>> msg.celery_task_id
'08226fa1-aa4a-4ba3-adb3-da5125295cc7'
Cheating like time.sleep() doesn't help too.
P. S. At the same time, access to the database as part of the Celery-Beat task is normal:
def send_daily_statistics_email():
yesterday = timezone.now() - timezone.timedelta(days=1)
messages = Message.objects.filter(datetime_send__date=yesterday)
...