How to close a Server-Sent Events connection to read

136 Views Asked by At

I'm trying to make a test for an http endpoint that uses Server-Sent Events to give a json stream, but after cancelling the response's context.Context, my json.Decode(resp.Body) is returning a "context cancelled" error. How would I gracefully finish the request to read all of the body received? Is there something I need to do on the server side?

UPDATE:

I've made a minimal reproducible example and found you can get the body of a cancelled request, but the example is flaky and doesn't work if there's a time.Sleep after the cancel. Why is this?

Example:

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "net/http/httptest"
    "time"
)

func Subscribe(w http.ResponseWriter, req *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming unsupported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")
    flusher.Flush()

    tick := time.Tick(500 * time.Millisecond)
    for {
        select {
        case <-req.Context().Done():
            return
        case <-tick:
            fmt.Fprintln(w, "EVENT!")
            flusher.Flush()
        }
    }
}

func main() {
    s := httptest.NewServer(http.HandlerFunc(Subscribe))
    defer s.Close()
    c := s.Client()

    var sub *http.Response
    var cancelSub context.CancelFunc
    {
        ctx := context.Background()
        ctx, cancelSub = context.WithCancel(ctx)
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.URL+"/subscribe", nil)
        if err != nil {
            panic(err.Error())
        }
        sub, err = c.Do(req)
        if err != nil {
            panic(err.Error())
        }
        defer sub.Body.Close()
        if sub.StatusCode != 200 {
            panic("StatusCode != 200")
        }
    }

    time.Sleep(3 * time.Second)
    cancelSub()

    // WITH THIS SLEEP NO BODY IS PRINTED?!
    time.Sleep(time.Second)

    b, err := io.ReadAll(sub.Body)
    fmt.Printf("error: %v\nbody:\n%s\n", err, b)
}

https://go.dev/play/p/sIjaoqpCuqC

Output with the sleep:

error: context canceled
body:

Output without the sleep:

error: context canceled
body:
EVENT!
EVENT!
EVENT!
EVENT!
EVENT!

1

There are 1 best solutions below

0
nimdrak On

In summary

  • I think it is natural that the cancel, timeout, deadline from context can happen when reading the response body. (https://pkg.go.dev/net/http#NewRequestWithContext)
  • In my opinion, the best is to handle the error by context and the error by json separately, or extend the deadline/timeout (if you use it with context)

In detail

In my opinion, it is natural that the cancel, timeout, deadline from context can happen when reading the response body.

For an outgoing client request, the context controls the entire lifetime of a request and its response: obtaining a connection, sending the request, and reading the response headers and body. https://pkg.go.dev/net/http#NewRequestWithContext

So if you want to avoid the cancel, timeout, deadline by context, I think you should extend the value of it like

...
    enoughLongTime = time.Second*10

    ctx, cancel := context.WithTimeout(context.Background(), enoughLongTime)
    defer cancel()

    req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
...

But if you want to differentiate the cancel, timeout, deadline and handling json, I think you can try this.

respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
    var netError net.Error
    if errors.As(err, &netError) {
        log.Printf("netError %v", netError)
        return nil, netError
    }
    return nil, netError
}

var jResult JsonResult
if err := json.Unmarshal(respBytes, &jResult); err != nil {
    return nil, errors.Wrap(err, "failed to unmarshal jResult")
}

I referred the two pages