Can't close confluent golang kafka consumer

1.5k Views Asked by At

I'm having an issue regarding the disposing of kafka consumer in the end of program execution. Here is code responsible for closing the consumer

func(kc *KafkaConsumer) Dispose() {
    Sugar.Info("Disposing of consumer")
    kc.mu.Lock()
    kc.Consumer.Close();
    Sugar.Info("Disposed of consumer")
    kc.mu.Unlock()
}

As you might have already noticed, i'm making use of sync.Mutex, inasmuch as consumer is accessed by multiple goroutines. Below is another snippet responsible for reading messages from kafka

  func (kc *KafkaConsumer) Consume(signalChan chan os.Signal, ctx context.Context) {
    for{
        select{
        case sig := <-signalChan:
            Sugar.Info("Caught signal %v", sig)
            break
        case <-ctx.Done():
            Sugar.Info("Got context done message. Closing consumer...")
            kc.Dispose()
            break
        default:
            for{
                message, err := kc.Consumer.ReadMessage(-1); if err != nil{
                    Log.Error(err.Error())
                    return
                }
                Sugar.Infof("Got a new message %v",message)
                resp := make(chan *KafkaResponseEntity)
                go router.UseMessage(*message, resp, ctx)
                //Potential deadlock
                response := <-resp
                /*
                    Explicit commit of an offset in order to ensure 
                    that request has been successfully processed
                */
                kc.Consumer.Commit()
                Sugar.Info("Successfully commited an offset")
                Sugar.Infof("Just got a response %v", response)
                go producer.KP.Produce(response.PaymentId, response.Bytes, "some_random_topic")
            }
        }
    }
}

The problem is that when closing the consumer, program execution simply stalls. Are there any issues? Should i use cond along with mutex? I'd be very glad if you provide thorough explanation of what might went wrong in my code. Thanks in advance.

1

There are 1 best solutions below

1
On

I suspect this is hanging because of:

kc.Consumer.ReadMessage(-1)

Which the documentation states will block indefinitely, hence why it's not closed. The simplest approach would be to make that value a positive time duration (e.g. 1 * time.Second), but then you may get time out errors if messages are not consumed within the timeouts. The time out error is generally innocuous, but is something to account for, from the linked documentation:

Timeout is returned as (nil, err) where err is
`err.(kafka.Error).Code() == kafka.ErrTimedOut`

I'm not yet sure of a good way to utilize the indefinite blocking and allow it to be interrupted. If anyone does know please post the findings!