Handle duplication error of mongoDb in celery

16 Views Asked by At

I Am writing messages into an AWS SQS Queue which should be processes by celery. In celery the json file of the filename is downloaded from an S3. Then I take batches of 20 events from the json and insert them into a mongoDB. In case the json event already exists in the mongoDB. There is automatically raised a duplication error which we handle the following way:

                try:
                    collection.insert_many(event_jsons, ordered=False)
                except errors.BulkWriteError as e:
                    # suppress duplicate key errors
                    panic_list = list(filter(lambda x: x['code'] != 11000, e.details['writeErrors']))
                    duplicates = list(filter(lambda x: x['code'] == 11000, e.details['writeErrors']))
                    if len(panic_list) > 0:
                        print(f"these are not duplicate errors {panic_list}")
                        raise Exception(f"Bulk error: {e}")
                    elif len(duplicates) > 0:
                        print(f"{len(duplicates)} duplicate errors fetched")
                    else:
                        pass
                except Exception as ex:
                    logger.error("problem with inserting to mongo database")
                    raise Exception(f"problem with inserting to mongo database")

Now all messages are processed at least once but there are still messages in the queue which are going to be processed more often. From the loggings I assume that jsons with duplicated were sending back to the Queue. Those were processed again and the queue gets emptier. However this consumes a lot of time since this procession is not necessary anymore. Is there a way such that duplicate errors are not handled as errors and thus not sent back to the Queue?

I would like to have a solution that only insert non-duplicate json-events of a batch but does not forces the message back to the SQS Queue in case there is only a duplicate error. One way I tried was to check the json for duplicates before adding them to the mongoDB and thus only json-events which were no duplicates were inserted into the mongo. This worked, however it also consumes a lot of time and I was wondering if there is way to handle this problem in the exception handling. Any help ist appriciated.

0

There are 0 best solutions below