How to wait for in process operations before canceling go routine

52 Views Asked by At

I have a message consumer (eg: Kafka) that runs on a go routine with a for-select with a default case where it processes the received message:

type Consumer struct{}

func (c *Consumer) Start(ctx context.Context) {
    fmt.Println("Consumer is starting")
    defer func() {
        fmt.Println("Consumer is stopping")
    }()

    i := 0
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Context is done")
            return
        default:
            // msg, err := kafka.FetchMessage(ctx)
            fmt.Printf("[%d] Received message\n", i)
            <-time.After(3 * time.Second)
            fmt.Printf("[%d] Stopped processing\n", i)
            i++
        }
    }
}

In my main function, I have a channel that listen for cancelation signals. Whenever the cancel signal is received, I want to cancel the go routine by calling the context cancel function.

func main() {
    fmt.Println("Main Start")
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    consumer := &Consumer{}
    go func() {
        consumer.Start(ctx)
    }()

    <-sigchan
    fmt.Println("Received signal to shutdown")
    cancel()
}

The requisites of the system are:

  • Message processing must be synchronous;
  • Must not lose any in-progress operations (messages);

However, when I cancel the program by clicking CTRL + C, if a message is in processing the processing is not completed. How can I implement a feature that will cancel the consumer only after the last message is processed.

Go Playgroud: https://go.dev/play/p/kRUPPhwxwn7

1

There are 1 best solutions below

0
coxley On BEST ANSWER

The reason it exits immediately is because when main() returns, all goroutines abruptly stopped. There's nothing waiting until consumer.Start is finished.

Using signal.NotifyContext is a bit more straight-forward, but we can accomplish it your way too.

// Your way, modified:
func main() {
    fmt.Println("Main Start")
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        <-sigchan
        fmt.Println("Received signal to shutdown")
        cancel()
    }()

    // Block until it exists, cancel in the background
    consumer := &Consumer{}
    consumer.Start(ctx)
}

// Allowing signal.NotifyContext to deal with it
func main() {
    fmt.Println("Main Start")
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    consumer := &Consumer{}
    consumer.Start(ctx)
    fmt.Println("Shutting down")
}

Also, note that CTRL+C is not exactly equivalent to kill -INT $PID. The former will send SIGINT to every process in the foreground proc's PGID. This means forks may get killed before the main process can do it gracefully.