Using tokio::time::timeout does not trap delayed response from TCPReadStream

56 Views Asked by At

I'm writing a PoC P2P node handshake to a BitCoin server. I send a 'version' message to the target node and it responds with its corresponding version message. So far so good.

However, every now and again, the version message from the target node can take over two minutes to arrive. The server does eventually give a successful response, but since I have applied a 5 second timeout to other network activity, I thought that for consistency, I should do the same here.

Here's the code without any check for a timeout. This works, albeit sometimes very slowly:

use bitcoin::{
    consensus::Decodable,
    p2p::message::{self},
};
use std::{
    io::BufReader,
    net::{IpAddr, SocketAddr, TcpStream},
    time::{Duration, SystemTime},
};

pub async fn handshake(to_address: IpAddr, port: u16, timeout: u64) -> Result<()> {
    let target_node = SocketAddr::new(to_address, port);

    let mut write_stream = TcpStream::connect_timeout(&target_node, Duration::from_secs(timeout))?;

    // Build and send version message
    send_msg_version(&target_node, &mut write_stream)?;

    let read_stream = write_stream.try_clone()?;
    let mut stream_reader = BufReader::new(read_stream);

    let response1 = message::RawNetworkMessage::consensus_decode(&mut stream_reader)?;

    match response1.payload() {
        // Do stuff here
    }

    // More stuff here...
}

The call to consensus_decode() is what sometimes takes a very long time, so I've tried wrapping it in a future::ready(), then placing that inside a call to tokio::time::timeout() like this:

    let response1 = if let Ok(net_msg) = tokio::time::timeout(
        Duration::from_secs(timeout),
        future::ready(message::RawNetworkMessage::consensus_decode(&mut stream_reader)?),
    )
    .await
    {
        net_msg
    } else {
        return Err(CustomError(format!(
            "TIMEOUT: {} failed to respond with VERSION message within {} seconds",
            to_address, timeout
        )));
    };

The (&mut stream_reader)? part successfully traps any decoding errors, yet if the message arrives successfully, but too slowly, the tokio::time::timeout isn't able to trap it.

What am I missing here?

2

There are 2 best solutions below

0
cafce25 On BEST ANSWER

You can't use future::ready to make a synchronous function call asynchronous, use tokio::task::spawn_blocking instead:

use std::time::Duration;
fn takes_long() {
    std::thread::sleep(Duration::from_millis(100));
}

#[tokio::main]
async fn main() {
    let x = tokio::time::timeout(Duration::from_millis(10), tokio::task::spawn_blocking(|| {
        takes_long();
    })).await;
    eprintln!("{x:?}");
}

produces this output:

Err(Elapsed(()))

The problem with your approach is that before future::ready is called (let alone tokio::time::timeout which is called even later), the function consensus_decode must have already returned and produced a value, at which point the future::ready can immediately return and your timeout never triggers.

2
Chayim Friedman On

An alternative to the solution by @cafce25, which is especially useful if you want cancelled calculations to not occupy CPU time (which they do with @cafce25's solution), is to make consensus_decode() asynchronous and springle tokio::task::yield_now().await calls every now and then (for example, every loop iteration) in the consensus_decode() function.

A better (more performant) but unstable solution is to use tokio::task::consume_budget().await, but that requires the tokio_unstable flag.