Rust TCP Server: Connection Reset by Peer During Concurrent Requests

45 Views Asked by At

I'm experiencing "connection reset by peer" errors when making multiple concurrent requests to my Rust TCP server. The server seems to handle some requests successfully, but others result in the connection being reset unexpectedly.

Here's my server code:

use crate::request::Request;
use crate::response::Response;
use crate::router::Router;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

pub struct Server {
    address: String,
    router: Arc<Router>,
}

impl Server {
    pub fn new(address: String, router: Router) -> Self {
        Server {
            address,
            router: Arc::new(router),
        }
    }

    pub async fn run(self) {
        let listener = TcpListener::bind(&self.address).await.expect("Failed to bind");
        println!("Server listening on {}", self.address);

        loop {
            let (mut stream, _) = match listener.accept().await {
                Ok(connection) => connection,
                Err(e) => {
                    eprintln!("Connection failed: {}", e);
                    continue;
                }
            };

            let router = Arc::clone(&self.router);

            tokio::spawn(async move {
                let mut buffer = Vec::new();
                let mut read_buffer = [0; 1024];
                let mut headers_ended = false;

                while let Ok(n) = stream.read(&mut read_buffer).await {
                    if n == 0 { break; }
                    buffer.extend_from_slice(&read_buffer[..n]);

                    // Check if we've encountered the end of the headers section
                    if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
                        headers_ended = true;
                        break;
                    }
                }

                if !headers_ended {
                    eprintln!("Failed to read the complete headers.");
                    return;
                }


                let request_str = String::from_utf8_lossy(&buffer);

                // Split the request string into headers and body parts
                let mut parts = request_str.split("\r\n\r\n");
                let headers_part = parts.next().unwrap_or_default();
                let body_part = parts.next().unwrap_or_default();

                let request = Request::new(headers_part.to_string(), body_part.to_string());
                let response = router.handle(request).await;

                let _ = stream.write(response.to_string().as_bytes()).await;
                let _ = stream.flush().await;
                let _ = stream.shutdown().await;
            });
        }
    }
}

And here's the code I used to test the server with concurrent requests:

use reqwest::Client;
use serde_json::json;
use tokio::task::JoinSet;

#[tokio::main]
async fn main() {
    let client = Client::new();

    let send_req = |client: Client| async move {
        client.post("http://127.0.0.1:8080/api/create/user").json(&json!({
            "name": "John",
            "age": 30
        })).send().await?.text().await
    };
    
    let mut set = JoinSet::new();
    for _ in 0..256 {
        set.spawn(send_req(client.clone()));
    }
    
    while let Some(x) = set.join_next().await {
        println!("{x:?}")
    }
}

I suspect the issue might be related to how I'm handling the requests and connections in the server code, but I'm not sure what exactly is causing the problem.

Here are a few things I've considered:

  1. Buffer Handling: I'm using a fixed-size buffer of 1024 bytes to read the request data. Could this be causing issues if the request is larger than the buffer size?

  2. Connection Handling: After handling a request, I'm immediately attempting to shutdown the connection using stream.shutdown().await. Is this the correct approach, or should I keep the connection open for a short duration to allow the client to properly close it?

I've tried increasing the buffer size and adding error handling, but the issue still persists. I'm not sure what else I'm missing.

0

There are 0 best solutions below