segmentio/kafka-go reader client not subscribing to the topic and partition

1.9k Views Asked by At

The reader client is not starting to consume messages. This is happening intermittently, in most cases it happens when there are no messages in the topic.

Kafka Version

Apache Kafka 3.3.0

kafka-go version

v0.4.38

Resources to reproduce the behavior:

Code:

func main() {

    topic_name := "dev-billing"
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
    ctx, cancel := context.WithCancel(context.Background())

    go func() {
        sig := <-signals
        fmt.Println("Got signal: ", sig)
        cancel()
    }()

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers:                []string{"0.0.0.0:9092"},
        GroupID:                "consumer-group-biller",
        GroupTopics:            []string{},
        Topic:                  topic_name,
        QueueCapacity:          10,
        MinBytes:               10e3,
        MaxBytes:               10e6,
        MaxWait:                3 * time.Second,
        PartitionWatchInterval: 5 * time.Second,
        WatchPartitionChanges:  true,
        StartOffset:            kafka.LastOffset,
        ReadBackoffMax:         10 * time.Second,
        Logger:                 log.Default(),
        OffsetOutOfRangeError:  true,
    })

    i := 0

    // listening for the interrupts in a different channel.
    defer func() {
        err := r.Close()
        if err != nil {
            fmt.Println("Error closing consumer: ", err)
            return
        }
        fmt.Println("Consumer closed")
    }()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            break
        }
        msg := m.Value
        content := Event{}
        json.Unmarshal([]byte(msg), &content)

        fmt.Printf("%+v\n", content)

        if content.StatusCode == 200 {
            i++
        }

        if err := r.CommitMessages(ctx, m); err != nil {
            log.Fatal("failed to commit messages:", err)
        }
        fmt.Println("Total:", i)
    }

    if err := r.Close(); err != nil {
        log.Fatal("failed to close reader:", err)
    }
}

Expected Behavior

Consumers should start consuming the messages in the topic partitions after the last offset as soon as it is started.

Observed Behavior

The consumer is not able to subscribe to the topic when the producer is already producing. If the consumer is start before the producer it works.

Error logs:

2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
2022/11/17 14:25:37 joined group consumer-group-biller as member [email protected] (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller.  generationID=44, [email protected] (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
2022/11/17 14:25:37 Joined group consumer-group-biller as member [email protected] (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member [email protected] (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
2022/11/17 14:25:37 subscribed to topics and partitions: map[]
2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:25:37 started commit for group consumer-group-biller

When it worked:

2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
2022/11/17 14:09:04 joined group consumer-group-biller as member [email protected] (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller.  generationID=35, [email protected] (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
2022/11/17 14:09:04 Joined group consumer-group-biller as member [email protected] (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
2022/11/17 14:09:04 started commit for group consumer-group-biller
2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
{RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
2022/11/17 14:09:04 committed offsets for group consumer-group-biller:

Ref: https://github.com/segmentio/kafka-go

1

There are 1 best solutions below

1
On

there has been issue recently with the library, I suggest to downgrade for the moment and report the issue in github.

specifically you can downgrade to v0.4.35, there has been some refactoring into the consumer group introduced in v0.4.36 which causes issues to the consumer group if you check the issues page.