How to configure tonic::transport::Endpoint to recover a broken connection?

1k Views Asked by At

I have a problem with a client. If the connection between the client and the server is lost. He can no longer restore it and the program that uses the client no longer works. For example, when you sleep for a long time or physically disconnect from the server. Further use of the grpc service is impossible, just a timeout error or a broken connection.

[2023-01-28T15:11:55Z DEBUG tower::buffer::worker] service.ready=true message=processing request
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(19), flags: (0x4: END_HEADERS) }
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(19) }
[2023-01-28T15:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(19), flags: (0x1: END_STREAM) }
[2023-01-28T15:12:01Z DEBUG hyper::proto::h2::server] stream error: connection error: broken pipe
[2023-01-28T15:12:01Z DEBUG h2::codec::framed_write] send frame=Reset { stream_id: StreamId(19), error_code: CANCEL }

or this

[2023-01-28T16:11:55Z DEBUG client] send: interval(40)
[2023-01-28T16:11:55Z DEBUG h2::codec::framed_write] send frame=Reset { stream_id: StreamId(25), error_code: CANCEL }
[2023-01-28T16:11:55Z DEBUG tower::buffer::worker] service.ready=true message=processing request
[2023-01-28T16:11:55Z DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(27), flags: (0x4: END_HEADERS) }
[2023-01-28T16:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(27) }
[2023-01-28T16:11:55Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(27), flags: (0x1: END_STREAM) }
[2023-01-28T16:11:56Z ERROR client] status: Cancelled, message: "Timeout expired", details: [], metadata: MetadataMap { headers: {} }

I encountered this problem on the working program in the finished product. To study it I created a simple server-client on from the tonic examples And I get the same errors. I run the server on a remote machine, that it would be possible to physically break the connection between the client and the server.

Server

struct Service {}
#[tonic::async_trait]
impl test_grpc::say_server::Say for Service {
    async fn hello(&self, request: Request<RequestSay>) -> Result<Response<ResponseSay>, Status> {
        let r = request.into_inner().text;
        debug!("in request: {}", r);
        Ok(Response::new(ResponseSay {
            text: format!("hello {r}"),
        }))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    env_logger::Builder::new()
        .filter_level(log::LevelFilter::from_str("debug").unwrap())
        .init();
    let s = Service {};
    let key = "secret token";
    let svc = test_grpc::say_server::SayServer::with_interceptor(
        s,
        move |req: Request<()>| -> Result<Request<()>, Status> {
            let token: MetadataValue<_> = key.parse().unwrap();
            match req.metadata().get("authorization") {
                Some(t) if token == t => Ok(req),
                _ => Err(Status::unauthenticated("No valid auth token")),
            }
        },
    );
    let addr = "0.0.0.0:8804".parse::<SocketAddr>().unwrap();
    Server::builder()
        .add_service(svc)
        .serve(addr)
        .await
        .unwrap();
    Ok(())
}

Client


async fn tester_client(sleep: Duration, uri: &str, key: &str) {
    let uri = uri.parse().unwrap();
    debug!("create connect");
    let chan = tonic::transport::Channel::builder(uri)
        .timeout(Duration::from_secs(20))
        .connect_timeout(Duration::from_secs(20))
        //.http2_keep_alive_interval(Duration::from_secs(5))
        //.keep_alive_while_idle(true)
        .connect_lazy();

    let key = key.parse::<tonic::metadata::MetadataValue<_>>().unwrap();
    let mut key = Some(key);
    let mut service = test_grpc::say_client::SayClient::with_interceptor(
        chan,
        move |mut req: tonic::Request<()>| {
            if let Some(secret) = &mut key {
                req.metadata_mut().insert("authorization", secret.clone());
            }
            Ok(req)
        },
    );
    loop {
        let send_text = format!("interval({})", sleep.as_secs_f32() / 60.0);
        debug!("send: {send_text}");
        let res = match service
            .hello(tonic::Request::new(test_grpc::RequestSay {
                text: send_text.clone(),
            }))
            .await
        {
            Ok(r) => r,
            Err(e) => {
                error!("{e:#}");
                continue;
            }
        };
        debug!("recv: {}", res.into_inner().text);
        time::sleep(sleep).await;
        println!();
    }
}

I have tried several settings. For example, if use .http2_keep_alive_interval(Duration::from_secs(5)) then the connection does not break during idle time. But if you physically break the connection, then it can no longer be restored. Perhaps I need to specify some other settings so that a new connection is established when it breaks?

0

There are 0 best solutions below