I have a message consumer (eg: Kafka) that runs on a go routine with a for-select with a default case where it processes the received message:
type Consumer struct{}
func (c *Consumer) Start(ctx context.Context) {
fmt.Println("Consumer is starting")
defer func() {
fmt.Println("Consumer is stopping")
}()
i := 0
for {
select {
case <-ctx.Done():
fmt.Println("Context is done")
return
default:
// msg, err := kafka.FetchMessage(ctx)
fmt.Printf("[%d] Received message\n", i)
<-time.After(3 * time.Second)
fmt.Printf("[%d] Stopped processing\n", i)
i++
}
}
}
In my main function, I have a channel that listen for cancelation signals. Whenever the cancel signal is received, I want to cancel the go routine by calling the context cancel function.
func main() {
fmt.Println("Main Start")
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
consumer := &Consumer{}
go func() {
consumer.Start(ctx)
}()
<-sigchan
fmt.Println("Received signal to shutdown")
cancel()
}
The requisites of the system are:
- Message processing must be synchronous;
- Must not lose any in-progress operations (messages);
However, when I cancel the program by clicking CTRL + C, if a message is in processing the processing is not completed. How can I implement a feature that will cancel the consumer only after the last message is processed.
Go Playgroud: https://go.dev/play/p/kRUPPhwxwn7
The reason it exits immediately is because when
main()returns, all goroutines abruptly stopped. There's nothing waiting untilconsumer.Startis finished.Using
signal.NotifyContextis a bit more straight-forward, but we can accomplish it your way too.Also, note that
CTRL+Cis not exactly equivalent tokill -INT $PID. The former will sendSIGINTto every process in the foreground proc'sPGID. This means forks may get killed before the main process can do it gracefully.