Multi-threading in async rust - why is my code failing to parallelize?

2.7k Views Asked by At

I'm trying to intentionally exhaust an API limit (900 calls) by running the following function:

#[get("/exhaust")]
pub async fn exhaust(_pool: web::Data<PgPool>, config: web::Data<Arc<Settings>>) -> impl Responder {
    let mut handles = vec![];

    for i in 1..900 {
        let inner_config = config.clone();
        let handle = thread::spawn(move || async move {
            println!("running thread {}", i);
            get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
                .await
                .unwrap();
        });
        handles.push(handle);
    }

    for h in handles {
        h.join().unwrap().await;
    }

    HttpResponse::Ok()

My machine has 16 cores so I expected the above to run 16x faster than a single-threaded function, but it doesn't. In fact it runs exactly as slow as the single-threaded version.

Why is that? What am I missing?

Note: the move || async move part looks a little weird to me, but I got there by following suggestions from the compiler. It wouldn't let me put async next to the first move due to async closures being unstable. Could that be the issue?

2

There are 2 best solutions below

0
On BEST ANSWER

This code will indeed run your async blocks synchronously. An async block creates a type that implements Future, but one thing to know is that Futures don't start running on their own, they have to either be await-ed or given to an executor to run.

Calling thread::spawn with a closure that returns a Future as you've done will not execute them; the threads are simply creating the async block and returning. So the async blocks aren't actually being executed until you await them in the loop over handles, which will process the futures in order.

One way to fix this is to use join_all from the futures crate to run them all simultaneously.

let mut futs = vec![];

for i in 1..900 {
    let inner_config = config.clone();
    futs.push(async move {
        println!("running thread {}", i);
        get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
            .await
            .unwrap();
    });
}

futures::future::join_all(futs).await;
0
On

The issue is that you're mixing multithreading and async in a way which causes all the work to be sequential: all your threads do is call get_single_tweet which is apparently an async function.

Now in a language like Javascript, get_single_tweet would create a task, which would return a promise symbolising the realisation of the task and run as soon as possible.

That's not how Rust works (or lots of other languages, incidentially, Python behaves much more like Rust than it does Javascript). In Rust, get_single_tweet just creates a future, it doesn't actually do anything, the future has to be polled for things to happen: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b26b47e62e46b66b60844aabc2ea7be1

When does this polling happens? When the dynamic chain of await-ing reaches the top of the event loop.

Here the future is created in the thread, then returned from the thread, then await-ed when fetched from the join, so your fetches are not being run in threads, they're being run here:

    for h in handles {
        h.join().unwrap().await;
    }

which is completely sequential.

Now to fix this, you basically have two options, but the important bits is to keep to your realms:

  • if you want to use threads, then the calls within them should be blocking
  • otherwise, instead of using thread::spawn you probably want to use task::spawn (or whatever the equivalent is in your runtime of choice), making sure you're using a multithreaded runtime is probably a good idea but here I'd assume get_single_tweet is performing IO work (e.g. fetching stuff from twitter), so a single-threaded runtime would probably yield most of the benefit already

task::spawn will (as its name indicates) create a task and return a handle to it, which is much closer to the behaviour of Javascript's async functions.