Get a list of consumer groups subscribed to a topic in Kafka using segmentio/kafka-go

335 Views Asked by At

I'm trying to get a list of consumer groups subscribed to a topic in Kafka using segmentio/kakfa-go. This can be done using a script provided by kafka:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
  topic3          0          241019          395308          154289          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic2          1          520678          803288          282610          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic3          1          241018          398817          157799          consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2
  topic1          0          854144          855809          1665            consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic2          0          460537          803290          342753          consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1
  topic3          2          243655          398812          155157          consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4

but I need to do this programatically. Is there a way to do it? I've tried the following with no success:

conn, err := kafka2.Dial("tcp", "localhost:9092")
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()

    partitions, err := conn.ReadPartitions("topic.test")
    if err != nil {
        panic(err.Error())
    }

    m := map[string]struct{}{}

    for _, p := range partitions {
        m[p.Topic] = struct{}{}

    }
    for k := range m {
        fmt.Println(k)
    }

    client := kafka2.Client{
        Addr:      kafka2.TCP(kafkaBroker),
        Timeout:   0,
        Transport: nil,
    }

    metReq := kafka2.MetadataRequest{
        Addr:   kafka2.TCP(kafkaBroker),
        Topics: []string{"topic.test"},
    }

    tmp, _ := client.Metadata(context.Background(), &metReq)

I've seen there an Kafka proxy API but I believe this is only available to the confluent implementation of Kafka, is that right?

thanks

0

There are 0 best solutions below