Rust send serialized struct from crossbeam channel to multiple receivers via tcp

800 Views Asked by At

I am trying to send a serialized struct over tcp to multiple machines. The tcp handler receives the serialized struct (String type) by a crossbeam channel from another thread. My problem is that the rx.try_iter() will drain the crossbeam channel, and if more than one client is connected the clients can't receive the same struct. I tried moving the rx.try_iter() out of the individual handle_client() function, but couldn't achieve a good result. Thanks for your time and help!

This is what I have so far:

(Server side)

use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::{Read,Write,Error};
use serde::{Serialize, Deserialize};
use crossbeam_channel::unbounded;

#[derive(Serialize, Deserialize)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

impl Serialized {
    pub fn serialize(buffer: Vec<u32>, timestep: u128) -> String {
        let x = Serialized {
           buffer,
           timestep 
        };
        serde_json::to_string(&x).unwrap()
    }
}

fn handle_client(mut stream: TcpStream, rx: crossbeam_channel::Receiver<String>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from channel
        let serialized = match rx.try_iter().last(){
            Some(x) => x,
            None => continue,
        };

        //write to stream
        stream.write(serialized.as_bytes())?;
    }
}

pub fn start_server(rx: crossbeam_channel::Receiver<String>) {
    
    let listener = TcpListener::bind("localhost:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        let rx = rx.clone();
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

(Client side)

use std::net::TcpStream;
use serde::{Serialize, Deserialize};
use std::error::Error as er;

#[derive(Serialize, Deserialize, Debug)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

fn read_user_from_stream(tcp_stream: &mut TcpStream) -> Result<Serialized, Box<dyn er>> {
    let mut de = serde_json::Deserializer::from_reader(tcp_stream);
    let u = Serialized::deserialize(&mut de)?;

    Ok(u)
}

pub fn start_client() {
    loop {
        let mut stream = TcpStream::connect("localhost:8888").expect("could not connect");
        let serialized = read_user_from_stream(&mut stream).unwrap();
        println!("timestep: {}", serialized.timestep);
    }
}

fn main() {
    start_client();
}
2

There are 2 best solutions below

4
On

crossbeam does not offer broadcast functionality. crossbeam only provides a producer-consumer architecture; if you want to deliver an item to multiple receivers, you need to use a different mechanism.

It seems that the bus crate provides what you need.

0
On

After some discussion on the rust user board, I came to this solution (server side):

use std::sync::{Arc, Mutex};
use std::thread;
use std::net::{TcpListener, TcpStream};
use std::io::{Read,Write,Error};
use bus::{Bus, BusReader};

fn main() {
    let mut x: u32 = 0;
    let bus = Bus::<u32>::new(10);

    let bus_mutex = Arc::new(Mutex::new(bus));

    let bus_mutex_cp = Arc::clone(&bus_mutex);
    thread::spawn(move || {
        start_server(bus_mutex_cp);
    });

    //simulation loop
    for _ in 0..99999 {
        x = x + 1;
        println!("Simulation step: {}", x);
        bus_mutex.lock().unwrap().broadcast(x);
        thread::sleep_ms(1000);
    }

    loop {}

}


pub fn start_server(bus_mutex: Arc<Mutex<Bus<u32>>>) {
    
    let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                let rx = bus_mutex.lock().unwrap().add_rx();
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

fn handle_client(mut stream: TcpStream, mut rx: BusReader<u32>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from bus
        let x = rx.recv().unwrap();

        //write to stream
        stream.write(&x.to_string().as_bytes())?;

        thread::sleep_ms(100);
    }
}