Task execution pause/resume in Rust async? (tokio)

1.7k Views Asked by At

How can I pause an async task in Rust?

Swift has withCheckedContinuation(function:_:) that pauses current task and returns saved context that can be resumed at desired time. (a.k.a. call/cc)

tokio has tokio::task::yield_now, but it can resume automatically, so that's not what I'm looking for. I mean "pause" that will never resume without explicit command.

Now I'm looking into tokio manual. It defines several synchronization features in tokio::sync module, but I couldn't find a function to pause a task directly. Am I supposed to use only the synchronization feature to simulate the suspend? Or am I missing something here?

2

There are 2 best solutions below

0
On

I don't know anything built-in, but you can build your own.

You need access to the Waker to unpark a future. You also need to keep track of whether the futute has been manually unparked, because futures can be waked by the runtime even if nobody ordered them to.

There are various ways to write this code, here is one:

// You can get rid of this `Unpin` bound, if you really want
pub async fn park(callback: impl FnOnce(Parker) + Unpin) {
    enum Park<F> {
        FirstTime { callback: F },
        SecondTime { unparked: Arc<AtomicBool> },
    }

    impl<F: FnOnce(Parker) + Unpin> Future for Park<F> {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            if let Self::SecondTime { unparked } = &*self {
                return if unparked.load(Ordering::SeqCst) {
                    Poll::Ready(())
                } else {
                    Poll::Pending
                };
            }

            let unparked = Arc::new(AtomicBool::new(false));
            let callback = match std::mem::replace(
                &mut *self,
                Self::SecondTime {
                    unparked: Arc::clone(&unparked),
                },
            ) {
                Self::FirstTime { callback } => callback,
                Self::SecondTime { .. } => unreachable!(),
            };
            callback(Parker {
                waker: cx.waker().clone(),
                unparked,
            });
            Poll::Pending
        }
    }

    Park::FirstTime { callback }.await
}

Then you call it like park(|p| { ... }).await.

Example.

0
On

tokio::sync::oneshot can be used for this purpose. It gives you two objects: one a future, and the other a handle you can use to cause the future to resolve. It also conveys a value, but that value can be just () if we don't need it.

In the following runnable example, main() is the task being paused and the task which resumes it is spawned, because that's the simplest thing to do; but in a real application you'd presumably pass on the sender to something else that already exists.

use std::time::Duration;
use tokio::spawn;
use tokio::time::sleep;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    println!("one");
    
    let (sender, receiver) = oneshot::channel::<()>();
    spawn(async move {
        sleep(Duration::from_millis(400));
        println!("two");
        if let Err(_) = sender.send(()) {
            println!("oops, the receiver dropped");
        }
    });
    
    println!("...wait...");
    match receiver.await {
        Ok(()) => println!("three"),
        Err(_) => println!("oops, the sender dropped"),
    }
}

Note that this is not a special feature of the oneshot channel: any future which you can control the resolution of can be used for this purpose. oneshot is appropriate when you want to hand out a specific handle to this one paused task. If you instead wanted many tasks to wake up on a single notification, you could use tokio::sync::watch instead.