Tornado embedded coroutines and saving data to db

274 Views Asked by At

Stack i'm using includes tornado(async) and mongodb(motor) I have a following algo for processing requests data:

  1. data from incoming request is saved to a event-specific collection
  2. data is saving to a unified events collection

Here is the request handlers code:

class EventHandler(BaseHandler):
    """ Handles all event requests
    """

    @gen.coroutine
    def post(self):
        """ Posts an event data
        """
        yield gen.Task(self.check_auth)
        self.validate_data()
        yield self._save_user()

        status_msg = yield self.save_entity()
        yield self.save_event()

        self.set_status(200, reason="OK, {}".format(status_msg))

And here is the code for methods, called from request handler

@gen.coroutine
def save_entity(self):
    """ Saves event entity data for proper collection. Entities: orders, pageviews, users etc
    """
    event = self.data.get("event_type")
    if event not in self._event_schema_map.keys():
        raise Return("No specific entity, just event")
    try:
        if event == "cart_add":
            msg = yield gen.Task(self._save_product)
        elif event == "cart_delete":
            msg = yield gen.Task(self._delete_product)
        elif event == "pageview":
            msg = yield gen.Task(self._save_pageview)
        elif event == "order_complete":
            msg = yield gen.Task(self._save_order)
        elif event in ["email_known", "email_form"]:
            msg = yield gen.Task(self._save_email)
    except Exception as e:
        raise HTTPError(500, log_message=str(e))
    raise Return(msg)

@gen.coroutine
def save_event(self, event=None, event_type=None, event_data=None):
    """ Saves event data to db. Works both as standalone method and as plug-in method
    :param event: event name
    :param event_type: event type
    :param event_data: dict with event-specific infoelements data
    """
    yield self.motor.events.insert(
        {
            "client_id": self.data.get("client_id"),
            "user_id": self.data.get("user_id"),
            "timestamp": datetime.now(),
            "event": self.data.get("event", event),
            "event_type": self.data.get("event_type", event_type),
            "event_data": self.data.get("event_data", event_data),
            "event_url": self.data.get("event_url"),
            "utms": self.data.get("utms"),
            "analytics_short": self.data.get("analytics_short"),
            "analytics_long": self.data.get("analytics_long")
        }
    )

All _save_%smth% is just simple motor CRUD actions, encapsulated in function calls and wrapped in @engine decorators, like the following:

@gen.engine
def _save_product(self, callback=None):
    """ Adds product to user's cart
    """
    cart_data = self.data.get("event_data")[0]
    try:
        yield self.motor.users.update(
            {"_id": self.data["user_id"], "client_id": self.data["client_id"]},
            {
                '$set': {
                    "cart_updated_at": datetime.now(),
                    "reminder": False,
                },
                '$push': {
                    "items": {
                        "product_id": cart_data.get("product_id"),
                        "image": cart_data.get("image"),
                        "title": cart_data.get("title"),
                        "price": int(cart_data.get("price"))
                    }
                }
            },
            upsert=True
        )
    except Exception as e:
        raise HTTPError(500, log_message=str(e))
    callback("New product in cart record added")

@gen.engine
def _save_order(self, callback=None):
    """ Saves order data to user's orders
    """
    order_data = self.data.get("event_data")
    try:
        yield self.motor.orders.update(
            {"user_id": self.data["user_id"], "client_id": self.data["client_id"]},
            {
                '$push': {
                    "orders": {
                        "completed_at": datetime.now(),
                        "analytics_short": self.data["analytics_short"],
                        "analytics_long": self.data["analytics_long"],
                        "utms": self.data["utms"],
                        "items": [
                            {
                                "product_id": i["product_id"],
                                "price": int(i["price"]),
                                "quantity": int(i["quantity"])
                            }
                            for i in order_data
                        ]
                    }
                }
            },
            upsert=True,
        )
    except Exception as e:
        raise HTTPError(500, log_message="Error in order updating: {}".format(e))
    try:
        yield self.motor.users.update(
            {"_id": self.data["user_id"], "client_id": self.data["client_id"]},
            {
                "$unset": {
                    "cart_created_at": '',
                    "cart_updated_at": '',
                    "reminder": '',
                    "items": ''
                }
            }
        )
    except Exception as e:
        raise HTTPError(500, log_message="Error in cart updating: {}".format(e))
    callback("Order record added")

So request data is saved twice in different collections: "specific one" in a save_entity function and "universal one" in a save_event func. But actually i see, that often (kinda 50% of cases) is missed (data isn't saved) and second save is performed.

All data processing and validation is made before, so assume, that data, thrown to mongo is suitable and valid.

So i'm trying to figure out, how such situation could happen. My guess is that save_entity function is bad-engineered and due to several embedded functions the request itself finishes and data just doesn't saved to the db. Could it be?

UPD added production code, so now the situation would be clearer. I hope:) UPD 2 added several CRUD methods

1

There are 1 best solutions below

5
A. Jesse Jiryu Davis On

"update" requires two parameters: a query that specifies which documents to update, and an update document. The query follows the same syntax as for find() or find_one(). The update document has two modes: it can replace the whole document, or it can update some fields of a document. "update" also takes some optional parameters including "multi" and "upsert". For more info on the "update" method, see the tutorial:

http://motor.readthedocs.org/en/stable/tutorial.html#updating-documents

In your code you call "update" with the first parameter but not the second. I expect your code is throwing "TypeError: update() missing 1 required positional argument: 'document'", and that the exception is being swallowed, or lost in a logfile, somewhere higher up your call chain.