Rust Generic Function Attempt for Async Reads

91 Views Asked by At

My goal is to reduce the following read_stream_***() functions into a generic fuction that can be passed different streams.

use async_std::net::TcpStream;
use async_std::{ task };
use async_std::io::{ stdin, BufReader, Stdin };
use async_std:: { prelude::* };
use futures::{select, FutureExt, AsyncRead };

pub async fn read_stream_stdin(streem:Stdin) -> Result<(), std::io::Error> 
{
  let mut lines_from_stream = BufReader::new(streem).lines().fuse();
  loop {
    select! {
      line = lines_from_stream.next().fuse() => match line {
        Some(line) => {
           println!("{:?}",line?);
        }
        None => break,
      }
    }
  }
  Ok(())
}

pub async fn read_stream_tcp(streem:TcpStream) -> Result<(), std::io::Error> 
{
  let mut lines_from_stream = BufReader::new(streem).lines().fuse();
  loop {
    select! {
      line = lines_from_stream.next().fuse() => match line {
        Some(line) => {
           println!("{:?}",line?);
        }
        None => break,
      }
    }
  }
  Ok(())
}

pub async fn connect_tcp_server(host_port:&str) -> Result<(), std::io::Error>
{
  let streem = TcpStream::connect(host_port).await;
  let _result = task::block_on(read_stream_tcp(streem?));

  Ok(())
}

fn main() -> Result<(), std::io::Error> {

  task::spawn( connect_tcp_server("127.0.0.1:8081") );
  task::block_on(read_stream_stdin(stdin()))

}

The Generic Attempt:

pub async fn read_stream<T>(streem:T) -> Result<(), std::io::Error>
{
  let mut lines_from_stream = BufReader::new(streem).lines().fuse();
  loop {
    select! {
      line = lines_from_stream.next().fuse() => match line {
        Some(line) => {
           println!("{:?}",line?);
        }
        None => break,
      }
    }
  }
  Ok(())
}

The Cargo.toml

[package]
name = "gen_func"
version = "0.1.0"
edition = "2021"

[dependencies]
async-std = "1.9.0"
futures = "0.3.21"

I attempted <T: async_std::io::Read> but fuse() and lines() are not implemented. and AsyncRead is not found in async_std::io . I found AsyncRead in futures crate but again fuse() and lines() were not implemented. I am not set on the read pattern. I am new to Rust and trying to build my source library to solve future programming tasks.

1

There are 1 best solutions below

1
On BEST ANSWER

First, as pointed out by kmdreko, the logic of your function(s) can be greatly simplified (at least based on the information given):

pub async fn read_stream_tcp(stream: TcpStream) -> Result<(), std::io::Error> {
    let mut lines = BufReader::new(stream).lines();
    while let Some(line) = lines.next().await {
        println!("{line:?}");
    }
}
Ok(())

Then, to figure out how to make this generic, you can just let the compiler tell you what it needs:

pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
{
    let mut lines = BufReader::new(stream).lines();
    while let Some(line) = lines.next().await {
        println!("{line:?}");
    }
    Ok(())
}

Notice the lack of where clauses or other constraints on T. The compiler will now complain:

error[E0277]: the trait bound `T: async_std::io::Read` is not satisfied  --> src/main.rs:15:36   |
15 |     let mut lines = BufReader::new(stream).lines();
   |                     -------------- ^^^^^^ the trait `async_std::io::Read` is not implemented for `T`
   |                     |
   |                     required by a bound introduced by this call
   |
note: required by a bound in `async_std::io::BufReader::<R>::new`
  --> /home/lucas/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/io/buf_reader.rs:55:9
   |
55 | impl<R: io::Read> BufReader<R> {
   |         ^^^^^^^^ required by this bound in `async_std::io::BufReader::<R>::new`
help: consider restricting type parameter `T`
   |
13 | pub async fn read_stream<T: async_std::io::Read>(stream: T) -> Result<(), std::io::Error>
   |                           +++++++++++++++++++++

Applying the compiler's suggestions (the above will result in a follow-up error) yields a full where clause of T: async_std::io::Read + std::marker::Unpin:

pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
where
    T: Read + std::marker::Unpin,
{
    let mut lines = BufReader::new(stream).lines();
    while let Some(line) = lines.next().await {
        println!("{line:?}");
    }
    Ok(())
}

async fn try_it() {
    // These will now compile just fine
    read_stream(async_std::io::stdin()).await.unwrap();
    read_stream(TcpStream::connect("127.0.0.1:8080").await.unwrap()).await.unwrap();
}

I attempted <T: async_std::io::Read> but fuse() and lines() are not implemented

This suggests that you tried replacing BufReader::new(stream) at the same time. You can do that, but you need to tell the compiler that you need something that implements the lines() method. Either make the parameter a fixed type BufReader<T> or make the where clause T: async_std::io::BufRead + std::marker::Unpin for a generic type.