django, multi-databases (writer, read-reploicas) and a sync issue

800 Views Asked by At

So... in response to an API call I do:

i = CertainObject(paramA=1, paramB=2)
i.save()

now my writer database has a new record.

Processing can take a bit and I do not wish to hold off my response to the API caller, so the next line I am transferring the object ID to an async job using Celery:

run_async_job.delay(i.id)

right away, or a few secs away depending on the queue run_async_job tried to load up the record from the database with that ID provided. It's a gamble. Sometimes it works, sometimes doesn't depending whether the read replicas updated or not.

Is there pattern to guarantee success and not having to "sleep" for a few seconds before reading or hope for good luck?

Thanks.

4

There are 4 best solutions below

2
On BEST ANSWER

The simplest way seems to be using the retries as mentioned by Greg and Elrond in their answers. If you're using shared_task or @app.task decorators, you can use the following code snippet.

@shared_task(bind=True)
def your_task(self, certain_object_id):
    try:
        certain_obj = CertainObject.objects.get(id=certain_object_id)
        # Do your stuff
    except CertainObject.DoesNotExist as e:
        self.retry(exc=e, countdown=2 ** self.request.retries, max_retries=20)

I used an exponential countdown in between every retry. You can modify it according to your needs.

You can find the documentation for custom retry delay here. There is also another document explaining the exponential backoff in this link

When you call retry it’ll send a new message, using the same task-id, and it’ll take care to make sure the message is delivered to the same queue as the originating task. You can read more about this in the documentation here

0
On

This article goes pretty deep into how you can handle read-after-write issues with replicated databases: https://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read-from-replicas-58cc43973638.

Like the author, I know of no foolproof catch-all way to handle read-after-write inconsistency.

The main strategy I've used before is to have some kind of expect_and_get(pk, max_attempts=10, delay_seconds=5) method that attempts to fetch the record, and attempts it max_attempts times, delaying delay_seconds seconds in between attempts. The idea is that it "expects" the record to exist, and so it treats a certain number of failures as just transient DB issues. It's a little more reliable than just sleeping for some time since it will pick up records quicker and hopefully delay the job execution much less often.

Another strategy would be to delay returning from a special save_to_read method until read replicas have the value, either by synchronously pushing the new value to the read replicas somehow or just polling them all until they return the record. This way seems a little hackier IMO.

For a lot of your reads, you probably don't have to worry about read-after-write consistency:

If we’re rendering the name of the enterprise a user is part of, it’s really not that big a deal if in the incredibly rare occasion that an admin changes it, it takes a minute to have the change propagate to the enterprise’s users.

0
On

The simplest solution would be to catch any DoesNotExist errors thrown at the start of the task, then schedule a retry. This can be done by converting run_async_job into a Bound Task:

@app.task(bind=True)
def run_async_job(self, object_id):
    try:
        instance = CertainObject.objects.get(id=object_id)
    except CertainObject.DoesNotExist:
        return self.retry(object_id)
0
On

As writing and then loading it immediately is a high priority, then why not store it in memory based DB like Memcache or Redis. So that after sometime, you can write it in the Database using a periodic job in celery which will run lets say every minute or so. When it is done writing to DB, it will delete the keys from Redis/Memcache.

You can keep the data in memory based DB for certain time, lets say 1 hour when the data is needed most. Also you can create a service method, which will check if the data is in memory or not.

Django Redis is a great package to connect to redis(if you are using it as broker in Celery).

I am providing some example based on Django cache:

# service method

from django.core.cache import cache

def get_object(obj_id, model_cls):
    obj_dict = cache.get(obj_id, None)  # checks if obj id is in cache, O(1) complexity
    if obj_dict:
       return model_cls(**obj_dict)
    else:
       return model_cls.objects.get(id=obj_id)


# celery job

@app.task
def store_objects():
    logger.info("-"*25)
    # you can use .bulk_create() to reduce DB hits and faster DB entries
    for obj_id in cache.keys("foo_*"):
        CertainObject.objects.create(**cache.get(obj_id))
        cache.delete(obj_id)
    logger.info("-"*25)