Kafka consumer lag issue when consuming two topics with UTC

375 Views Asked by At

I am encountering an issue with my Kafka consumer while consuming two topics (Topic A and Topic B) within a single consumer group. Both topics contain UTC timestamps in their data and are published simultaneously. The problem I'm facing is that when one topic is being consumed, the other topic seems to lag behind, and the consumption is not happening in parallel as expected.

I'm aiming for a more parallel and efficient consumption approach, where both topics are processed promptly as they are published. How can I address this issue and optimize the consumption of these two topics? Are there any best practices or configuration adjustments that I should consider?

I am using the Golang Sarama library for my Kafka consumer. Here's a snippet of how I'm currently setting up my Kafka consumer:

package kafka

import (
    "context"
    "os"
    "os/signal"
    "strings"
    "sync"
    "syscall"
    "github.com/Shopify/sarama"
)

type kafka_consumer_group struct {
    consumer ConsumerBase
}

var kafka_controller = controllers.NewKafkaController()

func Init(group map[string]interface{}) {
    cgo := &kafka_consumer_group{
        consumer: ConsumerBase{
            brokers:      strings.Split(group["brokers"].(string), ","),
            topics:       strings.Split(group["topics"].(string), ","),
            assignor:     group["assignor"].(string),
            version:      group["version"].(string),
            ready:        make(chan bool),
            group:        group["name"].(string),
            offsetNewest: isOffsetNewest(group["offset"].(string)),
            offsetOldest: isOffsetOldest(group["offset"].(string)),
        },
    }
    cgo.init()
}

func (kafka_consumer *kafka_consumer_group) init() {
    ctx := context.Background()
    zap.Debug(ctx, "kafka_cg:", "Starting a new Sarama consumer for :", kafka_consumer.consumer.group)
    version, err := sarama.ParseKafkaVersion(kafka_consumer.consumer.version)
    if err != nil {
        zap.Fatal(ctx, "kafka_cg", "Error parsing Kafka version:", err)
    }
    config := sarama.NewConfig()
    config.Version = version
    switch kafka_consumer.consumer.assignor {
    case "sticky":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
    case "roundrobin":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    case "range":
        config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    default:
        zap.Fatal(ctx, "kafka_cg", "Unrecognized consumer group partition assignor: ", kafka_consumer.consumer.assignor)
    }
    if kafka_consumer.consumer.offsetNewest {
        config.Consumer.Offsets.Initial = sarama.OffsetNewest
    }
    if kafka_consumer.consumer.offsetOldest {
        config.Consumer.Offsets.Initial = sarama.OffsetOldest
    }
    ctx, cancel := context.WithCancel(context.Background())
    client, err := sarama.NewConsumerGroup(kafka_consumer.consumer.brokers, kafka_consumer.consumer.group, config)
    if err != nil {
        zap.Fatal(ctx, "kafka_cg", "Error creating consumer group client:", err)
    }
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            if err := client.Consume(ctx, kafka_consumer.consumer.topics, kafka_consumer); err != nil {
                zap.Error(ctx, "kafka_cg:Error from consumer: ", err)
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                zap.Error(ctx, "kafka_cg:consumer context cancelled: ", err)
                return
            }
            kafka_consumer.consumer.ready = make(chan bool)
        }
    }()
    <-kafka_consumer.consumer.ready // Await till the consumer has been set up
    zap.Debug(ctx, "kafka_cg", "Sarama consumer up and running!... for", "consumer_group:", kafka_consumer.consumer.group, "with_topics:", kafka_consumer.consumer.topics)
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        zap.Debug(ctx, "kafka_cg", "terminating: context cancelled")
    case <-sigterm:
        zap.Debug(ctx, "kafka_cg", "terminating: via signal")
    }
    cancel()
    wg.Wait()
    if err = client.Close(); err != nil {
        zap.Fatal(ctx, "kafka_cg", "Error closing client: ", err)
    }
    zap.Debug(ctx, "kafka_cg", "consumer group", kafka_consumer.consumer.group, "closed successfully")
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (kafka_consumer *kafka_consumer_group) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(kafka_consumer.consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (kafka_consumer *kafka_consumer_group) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (kafka_consumer *kafka_consumer_group) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    
    ctx := tracer.WithTraceableContext(context.Background(), "kafka_cg: ConsumeClaim")

    // NOTE:
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    for message := range claim.Messages() {
        // fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        kafka_controller.ProcessMessage(ctx, message.Topic, string(message.Value))
        session.MarkMessage(message, "")
        session.Commit()
    }

    return nil
}

Any insights, suggestions, or examples of how to achieve parallel consumption of these two topics with synchronized UTC timestamps in Golang Sarama would be greatly appreciated.

1

There are 1 best solutions below

3
juexin zheng On

u can try new two different consumer-groups with different groupID.