I have a Rust app which is basically an axum web server with some routes.
But now I need to periodically check if in a database table there are some new rows.
If there are I need to do some HEAVY computations that can last even minutes (especially in an undersized docker container).
The code I'm using is the below and the output is:
The main thread should not be blocked and should print every second: Instant { t: 6479.5889821s }
The main thread should not be blocked and should print every second: Instant { t: 6480.5996495s }
The main thread should not be blocked and should print every second: Instant { t: 6481.6152853s }
Starting an heavy CPU computation...
The main thread should not be blocked and should print every second: Instant { t: 6502.5748215s }
The main thread should not be blocked and should print every second: Instant { t: 6503.5917731s }
The main thread should not be blocked and should print every second: Instant { t: 6504.5990575s }
As you can see the Starting an heavy CPU computation... blocks the main thread.
Is there a way to avoid this? Perhaps by using tokio::task::spawn_blocking() for each heavy job?
Can I start the entire "worker" in a separate thread? Because I have many different jobs.
I mean this code in main():
let worker = Queue::new();
tokio::spawn(async move { worker.run().await }); // Is there a way to launch this in a separate thread?
The code is here: Rust Playground
Or here: Rust Explorer
Or here:
use futures::StreamExt;
use rand::Rng;
use std::time::{Duration, Instant};
const CONCURRENCY: usize = 5;
struct Job {
id: u16,
}
struct Queue {
// some needed fields like DB connection
}
impl Queue {
fn new() -> Self {
Self {}
}
async fn run(&self) {
loop {
// I'll get jobs from DB here; for this demo are random generated
let mut jobs: Vec<Job> = Vec::new();
for _ in 0..2 {
jobs.push(Job {
id: get_random_id(),
})
}
futures::stream::iter(jobs)
.for_each_concurrent(CONCURRENCY, |job| async {
match self.handle_job(job).await {
Ok(_) => {
// I will remove the job from queue
}
Err(_) => {
// I will handle this error
}
};
})
.await;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn handle_job(&self, job: Job) -> Result<(), String> {
if job.id % 2 == 0 {
println!("Starting an heavy CPU computation...");
// I'm simulating heavy CPU computations with this sleep thread blocking here
std::thread::sleep(Duration::from_secs(10));
// I think I can use spawn_blocking instead, right?
// tokio::task::spawn_blocking(move || {
// std::thread::sleep(Duration::from_secs(8));
// }).await.unwrap()
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let worker = Queue::new();
// Can I start the below worker.run() in a separate thread?
tokio::spawn(async move { worker.run().await });
loop {
println!(
"The main thread should not be blocked and should print every second: {:?}",
Instant::now()
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
fn get_random_id() -> u16 {
let mut rng = rand::thread_rng();
rng.gen::<u16>()
}
Yes, you should use
spawn_blocking. That will ensure that the async loop in main is not blocked by your CPU-heavy code.Tokio has a number of blocking threads dedicated for this purpose and while it could be a problem if you run your CPU-heavy tasks unbounded, you are using
for_each_concurrentto impose your own limit on parallelism so that isn't a problem.