Async Rust task is only polled once

53 Views Asked by At

I’ve been trying to develop a program using async Rust and async-std crate. Because of the environment in which the code will run, I’m using 2 threads: the main thread and a secondary IO thread, both of which run async tasks. I’m having an issue I haven’t encountered before, where a struct, which implements the Future trait, is not being properly polled.

The code itself will eventually interact with physical hardware by taking some readings and processing requests sent by the main thread, but for now, this is a simplified version of what I’m trying to do:

#![allow(dead_code)]

use async_std::{
    channel::{self, Sender, Receiver, TryRecvError},
    future,
    sync::{Arc, Mutex},
    task::{self, JoinHandle}
};
use futures::{
    Future,
    FutureExt,
    select,
    stream::{FuturesUnordered, StreamExt}
};
use rand::{Rng, thread_rng};
use std::{
    fmt::Debug,
    pin::Pin,
    task::{Context, Poll},
    thread,
    time::{Duration, Instant}
};


#[derive(Debug)]
struct ProcessInfo {
    state: ProcessInfoState,

    parameters: ProcessInfoParameters
}
impl ProcessInfo {
    pub fn new(command: Command, parameters: ProcessInfoParameters) -> Result<Self, String> {
        let state = {
            if let Command::ToggleOff = command {
                ProcessInfoState::State2
            } else {
                ProcessInfoState::State1
            }
        };

        Ok(ProcessInfo {
            state,
            parameters
        })
    }

    pub async fn poll_logic(&mut self) {
        // This variable is only to print a limited amount of debug messages
        let mut loop_iterations: u8 = 0;

        loop {
            // We wait to get access to the Mutex locks
            let self_input_data_guard = self.parameters.input_data.lock().await;
            if loop_iterations < 50 { println!("Poll_logic() got InputData guard"); }

            let mut self_output_data_guard = self.parameters.output_data.lock().await;
            if loop_iterations < 50 { println!("Poll_logic() got OutputData guard"); }

            let mut output_data_changed = false;

            // We perform an action based on the process state (may or may not change the OutputData)
            match &mut self.state {
                ProcessInfoState::State1 => {
                    if loop_iterations < 50 { println!("State machine is in State 1"); }

                    if self_input_data_guard[0]  == 1 &&
                       self_input_data_guard[31] == 1
                    {
                        self_output_data_guard[20..24].copy_from_slice(&[1, 2, 3, 4]);
                        output_data_changed = true;

                        self.state = ProcessInfoState::State2;
                    }
                },
                ProcessInfoState::State2 => {
                    if loop_iterations < 50 { println!("State machine is in State 2"); }

                    if &self_input_data_guard[20..24] == [1, 2, 3, 4].as_slice()
                    {
                        self_output_data_guard[20..24].copy_from_slice(&[5, 6, 7, 8]);
                        output_data_changed = true;

                        self.state = ProcessInfoState::State3
                    }
                },
                ProcessInfoState::State3 => {
                    if loop_iterations < 50 { println!("State machine is in State 3"); }

                    if &self_input_data_guard[20..24] == [5, 6, 7, 8].as_slice()
                    {
                        self.state = ProcessInfoState::State4;
                    }
                },
                ProcessInfoState::State4 => {
                    if loop_iterations < 50 { println!("State machine is in State 4"); }

                    break;
                }
            }

            // If we need to change the OutputData, we drop the Mutex locks before sending a signal
            // to the read_write loop. We immediately wait for a confirmation before proceeding to the
            // next loop iteration.
            if output_data_changed {
                drop(self_input_data_guard);
                drop(self_output_data_guard);
                println!("Dropped Input/OutputData guards");

                self.parameters.updates_channel.try_send(CompoundResponseContainer::new(None, true, false)).unwrap();

                if loop_iterations < 50 {
                    println!("TASK SENT WRITE SIGNAL!!");
                    println!("TASK WAITING FOR CONFIRMATION!!");
                }

                // We wait to receive the confirmation with a timeout of 3 s.
                match future::timeout(std::time::Duration::from_secs(3), self.parameters.confirmations_channel.recv()).await {
                    Ok(not_timeout_result) => match not_timeout_result {
                        Ok(_)    => {},
                        Err(err) => println!("Error on receiving NR confirmation on task: {}", err)
                    },
                    Err(_) => println!("TASK RECEIVING NR CONFIRMATION TIMED OUT!")
                }
                if loop_iterations < 50 { println!("RECEIVED CONFIRMATION!!"); }
            }

            if loop_iterations < 50 { loop_iterations += 1; }
        }
    }
}
impl Future for ProcessInfo {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("Task received a poll()");

        match Box::pin(self.poll_logic()).poll_unpin(cx) {
            Poll::Pending => {
                println!("Task poll() returned Pending");

                Poll::Pending
            },
            Poll::Ready(value) => {
                println!("Task poll() returned Ready");

                Poll::Ready(value)
            }
        }
    }
}

#[derive(Debug, PartialEq, Clone, Copy)]
enum ProcessInfoState {
    State1,
    State2,
    State3,
    State4
}

#[derive(Debug, Clone)]
struct ProcessInfoParameters {
    updates_channel: Sender<CompoundResponseContainer>,
    confirmations_channel: Arc<Receiver<()>>,

    input_data: Arc<Mutex<[u8; 32]>>,
    output_data: Arc<Mutex<[u8; 32]>>,
}
impl ProcessInfoParameters {
    pub fn new(channel: &IOChannel) -> Self {
        ProcessInfoParameters {
            updates_channel:        channel.update_sender.clone(),
            confirmations_channel:  channel.conf_receiver.clone(),

            input_data:             channel.input_data.clone(),
            output_data:            channel.output_data.clone(),
        }
    }
}

#[derive(Debug, PartialEq, Clone, Copy)]
struct CompoundResponseContainer {
    response: Option<f32>,
    output_data_changed: bool,
    finished: bool
}
impl CompoundResponseContainer {
    pub fn new(response: Option<f32>, output_data_changed: bool, finished: bool) -> Self {
        Self { response, output_data_changed, finished }
    }

    pub fn response(&mut self) -> Option<f32> {
        self.response
    }

    pub fn output_data_changed(&self) -> bool {
        self.output_data_changed
    }

    pub fn finished(&self) -> bool {
        self.finished
    }
}

#[derive(Debug)]
struct IOChannel {
    input_data:         Arc<Mutex<[u8; 32]>>,
    output_data:        Arc<Mutex<[u8; 32]>>,

    request_receiver:   Receiver<Requests>,
    request_sender:     Option<Sender<Requests>>,

    task_sender:        Sender<JoinHandle<()>>,

    conf_receiver:      Arc<Receiver<()>>,
    update_sender:      Sender<CompoundResponseContainer>
}
impl IOChannel {
    pub fn new(task_tx: Sender<JoinHandle<()>>, update_tx: Sender<CompoundResponseContainer>, conf_rx: Receiver<()>) -> Self {
        let (tx, rx) = channel::bounded::<Requests>(15);

        IOChannel {
            input_data:         Arc::new(Mutex::new([0; 32])),
            output_data:        Arc::new(Mutex::new([0; 32])),

            request_receiver:   rx,
            request_sender:     Some(tx),

            task_sender:        task_tx,

            conf_receiver:      Arc::new(conf_rx),
            update_sender:      update_tx,
        }
    }

    pub fn input_data(&self) -> &Arc<Mutex<[u8; 32]>> {
        &self.input_data
    } 

    pub fn output_data(&self) -> &Arc<Mutex<[u8; 32]>> {
        &self.output_data
    }

    pub fn request_channel(&mut self) -> Option<Sender<Requests>> {
        self.request_sender.take()
    }

    pub fn check_request(&mut self) {
        match self.request_receiver.try_recv() {
            Ok(request) => {
                println!("Received a Request!");

                self.identify_request(request)
            },
            Err(err)    => match err {
                TryRecvError::Empty  => {},
                TryRecvError::Closed => println!("IOChannel request channel closed or dropped")
            }
        }
    }

    pub async fn do_command(command: Command, parameters: ProcessInfoParameters) {
        let _process = ProcessInfo::new(command, parameters).unwrap().await;
    }

    pub fn info(&mut self) -> JoinHandle<()> {
        let parameters = ProcessInfoParameters::new(&self);

        task::spawn(async move {
            Self::do_command(Command::ToggleOn, parameters.clone()).await;

            Self::do_command(Command::ReadData, parameters.clone()).await;

            Self::do_command(Command::ToggleOff, parameters.clone()).await;

            parameters.updates_channel.try_send(CompoundResponseContainer::new(None, false, true)).unwrap();
        })
    }

    pub fn identify_request(&mut self, request: Requests) {
        match request {
            Requests::Info => {
                let handle = self.info();

                self.task_sender.try_send(handle).unwrap();
            },
            _ => {}
        }
    }
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Requests {
    Configure,
    Info,
    Reset
}

enum Command {
    ToggleOn,
    ToggleOff,
    ReadData,
    WriteData
}


fn main() {
    // Necessary channel and variable initialization.
    let (task_tx, task_rx) = channel::bounded(50);
    let (update_tx, update_rx) = channel::bounded(50);
    let (conf_tx, conf_rx) = channel::bounded(50);

    let mut channel = IOChannel::new(task_tx, update_tx, conf_rx);
    let request_tx = channel.request_channel().unwrap();
    println!("IOChannel start\n");

    // Spawning of the secondary IO thread.
    let secondary_thread = thread::spawn(move || {
        // The read_write loop is responsible of checking the arrival of new requests and
        // responding accordingly to the tasks.
        let mut read_write_loop = Box::pin(async move {
            let update_rx = update_rx;
            let conf_tx = conf_tx;

            let timer = Instant::now();
            let mut counter = 0;

            loop {
                // To meet the conditions of the first state, we change the first
                // and last values of the InputData to 1.
                if counter == 0 {
                    let mut input_data_guard = channel.input_data().lock().await;

                    input_data_guard[0]  = 1;
                    input_data_guard[31] = 1;
                }

                // We check if a new process request has been sent. If there is,
                // the function will send the process's handle to the task_rx receiver.
                channel.check_request();

                // If we receive a signal from the process, we determine if the
                // OutputData needs to change and if the task has finished.
                if let Ok(container) = update_rx.try_recv() {
                    println!("RECEIVED MESSAGE ON UPDATE CHANNEL!");

                    // If the output data changed, copy it into the InputData
                    if container.output_data_changed() {    
                        let mut input_data_guard = channel.input_data().try_lock().unwrap();
                        let output_data_guard = channel.output_data().try_lock().unwrap();

                        let mut random_values = [0; 4];

                        for value in random_values.iter_mut() {
                            *value = thread_rng().gen_range(0..=255);
                        }

                        input_data_guard.copy_from_slice(&*output_data_guard);
                        input_data_guard[3..7].copy_from_slice(&random_values);
                    }

                    // Indicate if process has finished
                    if container.finished() {
                        println!("\nFINISHED PROCESS!");
                    } else {
                        // Send confirmation to allow the async task to continue
                        conf_tx.try_send(()).unwrap();
                        println!("SENT CONFIRMATION TO CHANNEL");

                        // Debugging messages
                        if conf_tx.is_empty() {
                            println!("Confirmations channel is empty");
                        } else {
                            println!("Confirmations channel is not empty. Number of messages: {}", conf_tx.len());
                        }
                    }
                }

                // This loop needs to run only for 25 s. We check if timer has finished.
                let elapsed = timer.elapsed();
                if elapsed >= Duration::from_secs(25) {
                    break;
                }

                // We wait 1 ms between iterations.
                task::sleep(Duration::from_millis(1)).await;
                counter += 1;
            }

            println!("\nIOChannel stopped\n");
        }).fuse();

        let mut task_reception_fut = Box::pin(task_rx.recv()).fuse();

        let mut task_collection = FuturesUnordered::<JoinHandle<()>>::new();

        // In this select loop, we simultaneously run the read_write_loop,
        // the reception and addition of new tasks, and the task group itself. 
        task::block_on(async move {
            loop {
                select! {
                    _           = read_write_loop                    => { break; },
                    recv_result = task_reception_fut                 => {
                                                                            match recv_result {
                                                                                Ok(task_handle) => { task_collection.push(task_handle); },
                                                                                Err(err)        => { println!("TASK_RECEPTION_FUT returned an error: {}", err); }
                                                                            }

                                                                            println!("Task collection has {} tasks", task_collection.len());
                                                                        },
                    _           = task_collection.select_next_some() => { println!("A task has exit successfully!"); }
                }
            }
        });
    });

    // From the main thread, we send a request.
    task::block_on(async move {
        task::sleep(Duration::from_secs(1)).await;

        request_tx.try_send(Requests::Info).unwrap();

        task::sleep(Duration::from_secs(30)).await;
    });

    // Wait for the secondary thread to wrap up.
    secondary_thread.join().unwrap();
}

As seen from the code above, the main thread only sends a request to the IOChannel and waits for it to be processed. In the secondary IO thread, I have a select loop that runs three futures simultaneously:

  • Waiting to receive a JoinHandle through a Receiver (which is then pushed to the FuturesUnordered collection).
  • A FuturesUnordered collection that attempts to run all the request operations simultaneously.
  • A read and write loop with a 25 s timeout, responsible for checking for the arrival of new requests from the main thread, as well as communicating with the task that these requests generate (receive updates and send confirmations).

This is what I expected to happen:

  1. The main thread sets up the required Rust channels for creating an instance of the IOChannel.
  2. The secondary IO thread spawns.
  3. The main thread waits a second before sending a Request to the IOChannel.
  4. In the secondary thread, read and write loop calls check_request() and finds a message in the channel.
  5. From it, an async task is spawned from the info() method and its handle is sent to task_rx.
  6. The task, consisting of 3 sequential calls to do_command(), starts with the first one.
  7. Do_command() creates a ProcessInfo instance and waits for it to complete.
  8. The Future implementation for ProcessInfo will start the poll_logic() method and poll it once.
  9. Inside poll_logic, the first iteration eventually reaches the output_data_changed condition, which results in waiting for the confirmation. Because the confirmation hasn’t arrived yet, poll_logic returns Pending.
  10. The read and write loop receives an update and sends a confirmation.
  11. ProcessInfo is further polled, allowing poll_logic() to continue from where it left off and onto the next iteration.
  12. ProcessInfo finishes.
  13. Repeat steps 7 – 12 two more times, as stated by the task.
  14. The task exits.

However, from the messages printed to the console, it seems like the poll_logic() function is only being polled once. Why is this?

Info printed to console

0

There are 0 best solutions below