Rust Actix Websockets & Actix Broker

890 Views Asked by At

I'm quite new to rust and actix, but I tried to build a technology prototype where a server sends protobuf messages to clients via websockets. The protobuf part works and is no problem, but I struggle with the websocket part.

I've tried to modify the official example of Actix-Websockets with the Actix-Broker (Chat-Broker-Example), but I'm having a hard time debugging it (beeing not familiar with VSCode, but thats another story).

The effect is, that the broker instance is not started and does not receive any messages. If I start the server manually via a supervisor (the example does not need to do that) than it still won't receive any messages.

Question: Has anybody any idea why the broker wont start atomaticly or doesnt receive the messages? Do I have any fundamential missunderstandings? Has anybody any idea how to make the programm work?

The code is uploaded publicly at github (GitHub Repository).

For your convinience I'll add the ws_client, ws_server and main.rs files below.

The changes I've done compared to the example are:

  • Removed #[derive(Default)] from WsChatServer and implemented it myself
  • Wrapped WsChatServer rooms in Arc and RwLock to ensure memory safty. (Needs to be overhauled)
  • Removed Message ListRooms and corresponding functions

I'd highly appreciate any help, tips or suggestions!

ws_server.rs:

use std::{collections::HashMap, sync::{Arc, RwLock}};

use actix::prelude::*;
use actix_broker::BrokerSubscribe;

use crate::{messages::{ChatMessage, JoinRoom, LeaveRoom, SendMessage}};

type Client = Recipient<ChatMessage>;
type Room = HashMap<usize, Client>;

#[derive(Clone)]
pub struct WsChatServer {
    rooms: Arc<RwLock<HashMap<String, Room>>>,
}

lazy_static! {
    static ref ROOMS: Arc<RwLock<HashMap<String, Room>>> = Arc::new(RwLock::new(Default::default()));
}

impl Default for WsChatServer {
    fn default() -> Self {
        let ws = WsChatServer { rooms: ROOMS.clone() };
        return ws;
    }
}


impl SystemService for WsChatServer {}
impl Supervised for WsChatServer {}

impl WsChatServer {

    pub fn create_room(room_name: &str) {
        let mut rooms = match ROOMS.write() {
            Ok(rooms) => rooms,
            Err(err) => {
                log::debug!("Error while requesting write lock. Error was: {}", err);
                return;
            },
        };

        if !rooms.contains_key(room_name) {
            let room: HashMap<usize, Client> = HashMap::new();
            rooms.insert(room_name.to_string(), room);
        }
    }



    fn take_room(&mut self, room_name: &str) -> Option<Room> {
        let mut guard = match self.rooms.write() {
            Ok(guard) => guard,
            Err(err) => {
                log::debug!("Error waiting for write lock. Error was: {}", err);
                return None;
            },
        };
        let room = match guard.get_mut(room_name){
            Some(room) => room,
            None => {
                log::debug!("Failed to get mutable reference of RW Guard");
                return None;
            },
        };
        let room = std::mem::take(room);
        Some(room)
    }

    fn add_client_to_room(&mut self, room_name: &str, id: Option<usize>, client: Client) -> usize {
        log::info!("In add_client_to_room Handler. Adding Client to room: {}", room_name);
        let mut id = id.unwrap_or_else(rand::random::<usize>);

        if let Some(room) = self.rooms.write().unwrap().get_mut(room_name) {
            loop {
                if room.contains_key(&id) {
                    id = rand::random::<usize>();
                } else {
                    break;
                }
            }

            room.insert(id, client);
            return id;
        }

        // Create a new room for the first client
        let mut room: Room = HashMap::new();

        room.insert(id, client);
        self.rooms.write().unwrap().insert(room_name.to_owned(), room);

        id
    }

    pub fn send_chat_message(&mut self, room_name: &str, msg: &str, _src: usize) -> Option<()> {
        let mut room = match self.take_room(room_name) {
            Some(room) => room,
            None => {
                    log::debug!("Error, could not take room.");
                    return None;
                },
        };

        for (id, client) in room.drain() {
            if client.try_send(ChatMessage(msg.to_owned())).is_ok() {
                self.add_client_to_room(room_name, Some(id), client);
            }
        }

        Some(())
    }
}

impl Actor for WsChatServer {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        log::info!("WsChatServer has started.");
        self.subscribe_system_async::<LeaveRoom>(ctx);
        self.subscribe_system_async::<SendMessage>(ctx);
    }
}

impl Handler<JoinRoom> for WsChatServer {
    type Result = MessageResult<JoinRoom>;

    fn handle(&mut self, msg: JoinRoom, _ctx: &mut Self::Context) -> Self::Result {
        log::info!("In Join Room Handler.");
        let JoinRoom(room_name, client) = msg;

        let id = self.add_client_to_room(&room_name, None, client);
        
        MessageResult(id)
    }
}

impl Handler<LeaveRoom> for WsChatServer {
    type Result = ();

    fn handle(&mut self, msg: LeaveRoom, _ctx: &mut Self::Context) {
        log::info!("Removing ws client from room.");
        if let Some(room) = self.rooms.write().unwrap().get_mut(&msg.0) {
            room.remove(&msg.1);
        }
    }
}

impl Handler<SendMessage> for WsChatServer {
    type Result = ();

    fn handle(&mut self, msg: SendMessage, _ctx: &mut Self::Context) {
        let SendMessage(room_name, id, msg) = msg;
        self.send_chat_message(&room_name, &msg, id);
    }
}

ws_client.rs:

use actix::{Actor, ActorContext, StreamHandler, Handler, SystemService, AsyncContext, WrapFuture, ActorFutureExt, fut, ContextFutureSpawner};
use actix_web_actors::ws;
use actix_broker::BrokerIssue;

use crate::messages::{ChatMessage, LeaveRoom, JoinRoom};
use crate::ws_server::WsChatServer;


pub struct WsConn {
    room: String,
    id: usize,
}

impl WsConn {
    pub fn new(room: &str) -> WsConn {
        WsConn {
            room: room.to_string(),
            id: rand::random::<usize>(),
        }        
    }

    pub fn join_room(&mut self, room_name: &str, ctx: &mut ws::WebsocketContext<Self>) {
        let room_name = room_name.to_owned();
        // First send a leave message for the current room
        let leave_msg = LeaveRoom(self.room.clone(), self.id);

        // issue_sync comes from having the `BrokerIssue` trait in scope.
        self.issue_system_sync(leave_msg, ctx);      
        log::info!("Ws client sent leave msg.");  

        // Then send a join message for the new room
        let join_msg = JoinRoom(
            room_name.to_owned(),
            ctx.address().recipient(),
        );
        

        WsChatServer::from_registry()
            .send(join_msg)
            .into_actor(self)
            .then(move |id, act, _ctx| {
                if let Ok(id) = id {
                    act.id = id;
                    act.room = room_name.clone().to_string();
                }

                fut::ready(())
            })
            .wait(ctx);

        log::info!("Ws client sent join msg."); 
    }
}

impl Actor for WsConn {
    type Context = ws::WebsocketContext<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        log::info!("ws client started.");
        self.join_room(self.room.to_owned().as_str(), ctx);
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        log::info!(
            "WsConn closed for {} in room {}",
            self.id,
            self.room
        );
    }
}

impl Handler<ChatMessage> for WsConn {
    type Result = ();

    fn handle(&mut self, msg: ChatMessage, ctx: &mut Self::Context) {
        ctx.text(msg.0);
    }
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsConn {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        let msg = match msg {
            Err(_) => {
                ctx.stop();
                return;
            }
            Ok(msg) => msg,
        };

        log::debug!("WEBSOCKET MESSAGE: {:?}", msg);

        match msg {
            ws::Message::Text(_) => (),
            ws::Message::Close(reason) => {
                ctx.close(reason);
                ctx.stop();
            }
            _ => {}
        }
    }
}


main.rs:

use std::time::Duration;
use std::{env, thread::sleep};
use std::fs::File;
use std::io::Write;
use actix_web::{App, HttpServer, middleware::Logger};
use actix_cors::Cors;
use tokio::task;

#[macro_use]
extern crate lazy_static;

mod protobuf_messages;
mod actions;
mod data;
mod ws_clients;
mod messages;
mod ws_server;

pub async fn write_to_file(buf: &[u8], file_name: &str) -> Result<(), std::io::Error> {
    let dir = env::current_dir().unwrap();
    let file_handler = dir.join(file_name);
    let mut file = File::create(file_handler).unwrap();
    file.write_all(buf)
}


#[actix_web::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "debug");
    env_logger::init();

    data::MAIN_CONFIG.version = "1.0".to_string();
    data::MAIN_CONFIG.mqtt_broker_address = "test".to_string();
    data::MAIN_CONFIG.wildcard = "#".to_string();
    data::MAIN_CONFIG.splitting_character = ".".to_string();
    data::MAIN_CONFIG.active_configs = [].to_vec();

    let ws_chat_server = ws_server::WsChatServer::default();
    let mut ws_chat_server_2 = ws_chat_server.clone();

    actix::Supervisor::start(|_| ws_chat_server);

    ws_server::WsChatServer::create_room("Main");

    let msg = serde_json::to_string_pretty(&data::MAIN_CONFIG).expect("Expected parsable string");
    
    task::spawn(async move {
        loop {
            match ws_chat_server_2.send_chat_message("Main", &msg.clone(), 0) {
                Some(()) => (),//log::debug!("Got a result from sending chat message"),
                None => (),//log::debug!("Got no result from sending chat message"),
            }
            
            sleep(Duration::from_secs(1));
        }        
    });


    HttpServer::new(move|| App::new()
        .wrap(Logger::default())
        .wrap(Cors::default().allow_any_origin().allow_any_header().allow_any_method())
        .service(actions::get_json)
        .service(actions::get_protobuf)
        .service(actions::start_ws_connection)
    )
    .bind(("127.0.0.1", 3000))?
    .workers(2)
    .run().await


}
1

There are 1 best solutions below

0
On

I finally figured a way out to solve the problem.

First of all I could not get the broker example to work.

My issue was, that I tried to externally send a message to all ws clients via the ws server broker. To do that you need to get the Addr of the server. If that is done the same way the Actor clients receive the lobby address with from_registry the result seems to be a different server instance and therfore the messages were not sent to the clients.

I could not find a way to get the same broker addr returned so I switched to the actix websocket example called "chat-tcp". In this example is a WS Server Actor created in the Main function and then added as variable to all requests. In the request handler were the clients set up with the server addr.

To make sure I can send to the same broker I encapsulated the Addr with an Arc and RwLock. Now I needed to dismantel it in the request handler. When I started a tokio green thread that sends messages to the ws server (and moved an Arc copy in the scope) it worked.

Does anyone know why the registered broker addresses where different?