I'm trying to get a list of consumer groups subscribed to a topic in Kafka using segmentio/kakfa-go
. This can be done using a script provided by kafka:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
but I need to do this programatically. Is there a way to do it? I've tried the following with no success:
conn, err := kafka2.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions("topic.test")
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
client := kafka2.Client{
Addr: kafka2.TCP(kafkaBroker),
Timeout: 0,
Transport: nil,
}
metReq := kafka2.MetadataRequest{
Addr: kafka2.TCP(kafkaBroker),
Topics: []string{"topic.test"},
}
tmp, _ := client.Metadata(context.Background(), &metReq)
I've seen there an Kafka proxy API but I believe this is only available to the confluent implementation of Kafka, is that right?
thanks