I'm having an issue regarding the disposing of kafka consumer in the end of program execution. Here is code responsible for closing the consumer
func(kc *KafkaConsumer) Dispose() {
Sugar.Info("Disposing of consumer")
kc.mu.Lock()
kc.Consumer.Close();
Sugar.Info("Disposed of consumer")
kc.mu.Unlock()
}
As you might have already noticed, i'm making use of sync.Mutex, inasmuch as consumer is accessed by multiple goroutines. Below is another snippet responsible for reading messages from kafka
func (kc *KafkaConsumer) Consume(signalChan chan os.Signal, ctx context.Context) {
for{
select{
case sig := <-signalChan:
Sugar.Info("Caught signal %v", sig)
break
case <-ctx.Done():
Sugar.Info("Got context done message. Closing consumer...")
kc.Dispose()
break
default:
for{
message, err := kc.Consumer.ReadMessage(-1); if err != nil{
Log.Error(err.Error())
return
}
Sugar.Infof("Got a new message %v",message)
resp := make(chan *KafkaResponseEntity)
go router.UseMessage(*message, resp, ctx)
//Potential deadlock
response := <-resp
/*
Explicit commit of an offset in order to ensure
that request has been successfully processed
*/
kc.Consumer.Commit()
Sugar.Info("Successfully commited an offset")
Sugar.Infof("Just got a response %v", response)
go producer.KP.Produce(response.PaymentId, response.Bytes, "some_random_topic")
}
}
}
}
The problem is that when closing the consumer, program execution simply stalls. Are there any issues? Should i use cond along with mutex? I'd be very glad if you provide thorough explanation of what might went wrong in my code. Thanks in advance.
I suspect this is hanging because of:
Which the documentation states will block indefinitely, hence why it's not closed. The simplest approach would be to make that value a positive time duration (e.g. 1 * time.Second), but then you may get time out errors if messages are not consumed within the timeouts. The time out error is generally innocuous, but is something to account for, from the linked documentation:
I'm not yet sure of a good way to utilize the indefinite blocking and allow it to be interrupted. If anyone does know please post the findings!