try_join to make mongodb transactions sent at the same time

518 Views Asked by At

I'm new to Rust and I'm using the default MongoDB driver https://docs.rs/mongodb/2.0.0/mongodb/

I remember when coding with Node.js, there was a possibility to send transactions with some Promise.all() in order to execute all transactions at the same time for optimization purposes, and if there are no errors, to make a commit to the transaction. (Node.js example here: https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74)

I'm trying to implement the same logic in Rust now, using try_join! but I'm always opposed to the problem:

error: cannot borrow session as mutable more than once at a time; label: first mutable borrow occurs here

use mongodb::{bson::oid::ObjectId, Client, Database, options};
use async_graphql::{
    validators::{Email, StringMaxLength, StringMinLength},
    Context, ErrorExtensions, Object, Result,
};
use futures::try_join;
//use tokio::try_join; -> same thing

#[derive(Default)]
pub struct UserMutations;

#[Object]
impl UserMutations {


async fn user_followed<'ctx>(
        &self,
        ctx: &Context<'ctx>,
        other_user_id: ObjectId,
        current_user_id: ObjectId,
    ) -> Result<bool> {

    let mut session = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!")
        .start_session(Some(session_options))
        .await?;

    session.start_transaction(Some(options::TransactionOptions::builder()
            .read_concern(Some(options::ReadConcern::majority()))
            .write_concern(Some(
                options::WriteConcern::builder()
                    .w(Some(options::Acknowledgment::Majority))
                    .w_timeout(Some(Duration::new(3, 0)))
                    .journal(Some(false))
                    .build(),
            ))
            .selection_criteria(Some(options::SelectionCriteria::ReadPreference(
                options::ReadPreference::Primary
            )))
            .max_commit_time(Some(Duration::new(3, 0)))
            .build())).await?; 
    
   
    let db = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!").database("database").collection::<Document>("collection");

             try_join!(
                db.update_one_with_session(
                    doc! {
                        "_id": other_user_id
                    },
                    doc! {
                        "$inc": { "following_number": -1 }
                    },
                    None,
                    &mut session,
                ),
                db.update_one_with_session(
                    doc! {
                        "_id": current_user_id
                    },
                    doc! {
                        "$inc": { "followers_number": -1 }
                    },
                    None,
                    &mut session,
                )
            )?;
    
    Ok(true)
  }
}

849 | |                     &mut session,
    | |                     ------------ first mutable borrow occurs here
...   |
859 | |                     &mut session,
    | |                     ^^^^^^^^^^^^ second mutable borrow occurs here
860 | |                 )
861 | |             )?;
    | |_____________- first borrow later captured here by closure

Is there any way to send transaction functions sync to not lose any time on independent mutations? Does anyone have any ideas? Thanks in advance!

2

There are 2 best solutions below

0
On BEST ANSWER

Thanks, Patrick and Zeppi for your answers, I did some more research on this topic and also did my own testing. So, let's start.

First, my desire was to optimize transactional writes as much as possible, since I wanted the complete rollback possibility required by code logic.

In case you missed my comments to Patrick, I'll restate them here to better reflect what was my way of thinking about this:

I understand why this would be a limitation for multiple reads, but if all actions are on separate collections (or are independent atomic writes to multiple documents with different payloads) I don't see why it's impossible to retain casual consistency while executing them concurrently. This kind of transaction should never create race conditions / conflicts / weird lock behaviour, and in case of error the entire transaction is rolled back before being committed anyways.

Making an analogy with Git (which might be wrong), no merge conflicts are created when separate files / folders are updated. Sorry for being meticulous, this just sounds like a major speed boost opportunity.

But, after lookups I was opposed to this documentation: https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#why-does-a-network-error-cause-the-serversession-to-be-discarded-from-the-pool

An otherwise unrelated operation that just happens to use that same server session will potentially block waiting for the previous operation to complete. For example, a transactional write will block a subsequent transactional write.

Basically, this means that even if you will send transaction writes concurrently, you won't gain much efficiency because MongoDB itself is a blocker. I decided to check if this was true, and since NodeJS driver setup allows to send transactions concurrently (as per: https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74) I did a quick setup with NodeJS pointing to the same database hosted by Atlas in the free tier.

Second, statistics and code: That's the NodeJS mutation I will be using for tests (each test has 4 transactional writes). I enabled GraphQL tracing to benchmark this, and here are the results of my tests...

export const testMutFollowUser = async (_parent, _args, _context, _info) => {
  try {

    const { user, dbClient } = _context;
    isLoggedIn(user);
    const { _id } = _args;


    const session = dbClient.startSession();
    const db = dbClient.db("DB");

    await verifyObjectId().required().validateAsync(_id);

    //making sure asked user exists
    const otherUser = await db.collection("users").findOne(
      { _id: _id },
      {
        projection: { _id: 1 }
      });


    if (!otherUser)
      throw new Error("User was not found");
    

    const transactionResult = session.withTransaction(async () => {
        
        //-----using this part when doing concurrency test------

        await Promise.all([
          await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db }),
          await db.collection('users').updateOne(
            { _id: user._id },
            { $inc: { following_number: 1 } },

          ),
          await db.collection('users').updateOne(
            { _id },
            {
              $inc: { followers_number: 1, unread_notifications_number: 1 }
            },

          ),

          await createNotification({
            action: 'USER_FOLLOWED',
            to: _id
          }, _context)

        ]);
        //-----------end of concurrency part--------------------
        

        
        //------using this part when doing sync test--------

        //this as a helper for db.insertOne(...)
        const insertedId = await createObjectIdLink({ db_name: 'links', from: user._id, to: _id, db });


        const updDocMe = await db.collection('users').updateOne(
          { _id: user._id },
          { $inc: { following_number: 1 } },

        );

        const updDocOther = await db.collection('users').updateOne(
          { _id },
          {
            $inc: { followers_number: 1, unread_notifications_number: 1 }
          },

        );
        
        //this as another helper for db.insertOne(...)
        await createNotification({
          action: 'USER_FOLLOWED',
          to: _id
        }, _context);
        //-----------end of sync part---------------------------


        return true;


      }, transactionOptions);

      if (transactionResult) {
        console.log("The reservation was successfully created.");
      } else {
        console.log("The transaction was intentionally aborted.");
      }

      await session.endSession();

      return true;


    }

And related performance results:

format: 
Request/Mutation/Response = Total (all in ms)

1) For sync writes in the transaction:

4/91/32 = 127
4/77/30 = 111
7/71/7 = 85
6/66/8 = 80
2/74/9 = 85
4/70/8 = 82
4/70/11 = 85
--waiting more time (~10secs)
9/73/34 = 116

totals/8 = **96.375 ms in average**

//---------------------------------

2) For concurrent writes in transaction:

3/85/7 = 95
2/81/14 = 97
2/70/10 = 82
5/81/11 = 97
5/73/15 = 93
2/82/27 = 111
5/69/7 = 81
--waiting more time (~10secs)
6/80/32 = 118

totals/8 = ** 96.75 ms ms in average **

Conclusion: the difference between the two is within the margin of error (but still on the sync side).

My assumption is with the sync way, you're spending time to wait for DB request/response, while in a concurrent way, you're waiting for MongoDB to order the requests, and then execute them all, which at the end of the day will cost the same time.

So with current MongoDB policies, I guess, the answer to my question will be "there is no need for concurrency because it won't affect the performance anyway." However, it would be incredible if MongoDB would allow parallelization of writes in transactions in future releases with locks on document level (at least for WiredTiger engine) instead of database level, as it is currently for transactions (because you're waiting for the whole write to finish until next one).

Feel free to correct me if I missed/misinterpreted something. Thanks!

4
On

This limitation is actually by design. In MongoDB, client sessions cannot be used concurrently (see here and here), and so the Rust driver accepts them as &mut to prevent this from happening at compile time. The Node example is only working by chance and is definitely not recommended or supported behavior. If you would like to perform both updates as part of a transaction, you'll have to run one update after the other. If you'd like to run them concurrently, you'll need to execute them without a session or transaction.

As a side note, a client session can only be used with the client that it was created from. In the provided example, the session is being used with a different one, which will cause an error.