Kafka golang consumers do not rebalance as expected

252 Views Asked by At

I'm trying to test rebalancing consumers. I've read about consumer groups and they don't work as I expected. I have a two consumers running in the same group like so:

package main

import (
    "errors"
    "net"
    "os"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/sirupsen/logrus"
)

type Callback func(*kafka.Message) error

type CallbackMap map[string]Callback

type Consumer struct {
    consumer    *kafka.Consumer
    name        string
    callbackMap CallbackMap
}


func NewKafkaConsumer(topics []string, name string) (*Consumer, error) {

    bootstrap := os.Getenv("KAFKA_HOSTNAME")

    consumer, err := kafka.NewConsumer(
        &kafka.ConfigMap{
            "bootstrap.servers":        bootstrap,
            "group.id":                 "testing",
            "security.protocol":        "plaintext",
            "go.events.channel.enable": true,
        },
    )

    if err != nil {
        logrus.WithError(err).Fatal("failed to create consumer")
        return nil, err
    }

    if err := consumer.SubscribeTopics(topics, nil); err != nil {
        logrus.WithError(err).Fatal("failed to subscribe to topics")
        return nil, err
    }

    return &Consumer{
        consumer:    consumer,
        name:        name,
        callbackMap: make(CallbackMap),
    }, nil
}

func (c *Consumer) Subscribe(key string, callback Callback) {
    c.callbackMap[key] = callback
}

func (c *Consumer) UnSubscribe(key string) {
    delete(c.callbackMap, key)
}

func (c *Consumer) HandleMessage(message *kafka.Message, chano chan error) {
    if message.TopicPartition.Error != nil {
        logrus.WithError(message.TopicPartition.Error).Warning()
        return
    }
    logrus.Errorf("CONSUMER NAME IS =========== %s", c.name)
    logrus.Errorf("MESSAGE is ============: %s", message)
    logrus.Errorf("VALUE IS =============: %s", string(message.Value))
    logrus.Errorf("PARTITION IS =========== %d", message.TopicPartition.Partition)
    if string(message.Value) == "20" {
        logrus.Errorf("Exited consumer: %s", c.name)
        chano <- errors.New("err")
        return
    }

}

func (c *Consumer) Consume() {
    logrus.Errorf("entering consumer: %s", c.name)
    chano := make(chan error)
    for event := range c.consumer.Events() {
        select {
        case _ = <-chano:
            err := c.consumer.Close()
            if err != nil {
                logrus.Errorf("Failed to close consumer")
                return
            }
            return
        default:
            switch e := event.(type) {
            case *kafka.Message:
                c.HandleMessage(e, chano)
            case kafka.Error:
                logrus.WithError(e).Warning("an error occurred while reading from topic")

            default:
                // Ignore other event types
            }
        }
    }
}

func main() {
    kafkaConsumer1, err := NewKafkaConsumer([]string{"test_topic"}, "one")
    kafkaConsumer2, err := NewKafkaConsumer([]string{"test_topic"}, "two")
    logrus.Errorf("started")
    if err != nil {
        logrus.Errorf("err %s", err)
        return
    }

    go kafkaConsumer1.Consume()
    go kafkaConsumer2.Consume()
    select {}

}

I am producing messages with sequential values. I expect that once the value hits 20, the consumer will stop and close and rebalancing will occur, with the other consumer starting to listen. This doesn't happen - once 20 hits, all consuming stops. Why is this happening? For example, consumer one started and the log I get is:

ERRO[0078] CONSUMER NAME IS =========== one             
ERRO[0078] MESSAGE is ============: test_topic[3]@947        
ERRO[0078] VALUE IS =============: 20                   
ERRO[0078] PARTITION IS =========== 3                   
ERRO[0078] Exited consumer: one     

And that's it.

Thanks

0

There are 0 best solutions below