Async: how to keep using the same future in a loop with select (no_std environment)?

808 Views Asked by At

I have two async functions: get_message and get_event. I'd like to perform an action whenever a message arrives or an event comes and do that forever in an infinite loop.

The simplified setup looks like this:

use futures::{future::select, future::Either, pin_mut};

impl MsgReceiver {
    async fn get_message(&mut self) -> Message { /* ... */ }
}

impl EventListener {
    async fn get_event(&mut self) -> Event { /* ... */ }
}

async fn eternal_task(receiver: MsgReceiver, listener: EventListener) -> ! {
    let get_msg_fut = receiver.get_message();
    pin_mut!(get_msg_fut);
    loop {
        let get_event_fut = listener.get_event();

        pin_mut!(get_event_fut);

        match select(get_event_fut, get_msg_fut).await {
            Either::Left((ev, r_get_msg_fut)) => {
                /* react to the event */

                // r_get_msg_fut is not done, how to reuse it in the next iteration?
            }
            Either::Right((msg, r_get_event_fut)) => {
                /* react to the message */

                // it's fine to drop get_event_fut here

                // the following line causes a double-mut-borrow error on receiver,
                // despite receiver isn't borrowed anymore (the old future is completed and dropped)
                let new_future = receiver.get_message();
            }
        };
    }
}

I have three major questions here:

  1. When an event comes first, how to tell rust that I want to reuse the incomplete get_message future on the next loop iteration?
  2. When a message comes first, how to construct a new future without a borrow error?
  3. When (2) is solved, how to put the new future into the same pinned memory location and use it on the next loop iteration?
3

There are 3 best solutions below

0
On

I think this is tricky to get right, even with unsafe which would probably be needed to accomplish this. Persisting and reusing the same variables isn't too hard, its actually #2 that's the hardest (at least with the current borrow checker).

I found a solution that totally circumvents the problem by using the async-stream crate to provide an intermediary:

async fn eternal_task(mut receiver: MsgReceiver, mut listener: EventListener) -> ! {
    let combined = futures::stream::select(
        stream! { loop { yield Either::Left(receiver.get_message().await); } },
        stream! { loop { yield Either::Right(listener.get_event().await); } },
    );

    pin_mut!(combined);
    while let Some(msg_or_evt) = combined.next().await {
        match msg_or_evt {
            Either::Left(msg) => {
                // do something with msg
            }
            Either::Right(evt) => {
                // do something with evt
            }
        };
    }

    unreachable!()
}

It uses the stream! macro to generate a type that continuously calls and yields values from .get_message() and .get_event(). It then uses futures::stream::select and Either to combine them. And then its just a matter of looping over the results. It works in #![no_std].

2
On

I had success using this, but could not get rid of the Box::pin

use futures::{future::select, future::Either, pin_mut};
use std::sync::Mutex;
#[derive(Debug)]
struct MsgReceiver;
#[derive(Debug)]
struct EventListener;
#[derive(Debug)]
struct Message;
#[derive(Debug)]
struct Event;
impl MsgReceiver {
    async fn get_message(&mut self) -> Message {
        Message
    }
}

impl EventListener {
    async fn get_event(&mut self) -> Event { 
        Event
    }

}

async fn eternal_task(receiver: MsgReceiver, mut listener: EventListener) -> ! {
    let receiver = Mutex::new(receiver);
    let mut f = None;
    loop {
        let get_msg_fut = match f.take() {
            None => {
                let mut l = receiver.lock();
                Box::pin(async move {
                    l.get_message().await
                })
            }
            Some(f) => f,

        };
        let get_event_fut = listener.get_event();
        pin_mut!(get_event_fut);

        match select(get_event_fut, get_msg_fut).await {
            Either::Left((ev, r_get_msg_fut)) => {
                /* react to the event */
                // store the future for next iteration
                f = Some(r_get_msg_fut);
            }
            Either::Right((msg, r_get_event_fut)) => {
                /* react to the message */
            }
        };
    }
}

#[tokio::main]
async fn main() {
    eternal_task(MsgReceiver, EventListener).await;
}
0
On

Tokio has documentation on how to reuse the same future in a loop with select: https://tokio.rs/tokio/tutorial/select#resuming-an-async-operation

TL;DR:

async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    
    
    let operation = action();
    tokio::pin!(operation);
    
    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}