Sarama Kafka library: how to unit test a consumer group's session.MarkMessage()?

2.9k Views Asked by At

I'm trying to adapt code from the consumer group example for github.com/Shopify/sarama, and am struggling to add a unit test which tests the functionality of session.MarkMessage() in the ConsumeClaim method (https://github.com/Shopify/sarama/blob/5466b37850a38f4ed6d04b94c6f058bd75032c2a/examples/consumergroup/main.go#L160).

Here is my adapted code with a consume() function:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "syscall"

    "github.com/Shopify/sarama"
)

var (
    addrs = []string{"localhost:9092"}
    topic = "my-topic"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    var wg sync.WaitGroup
    defer wg.Wait()

    consumer := &Consumer{ready: make(chan bool)}

    close := consume(ctx, &wg, consumer)
    defer close()

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        log.Println("terminating: context cancelled")
    case <-sigterm:
        log.Println("terminating: via signal")
    }
}

func consume(ctx context.Context, wg *sync.WaitGroup, consumer *Consumer) (close func()) {
    config := sarama.NewConfig()
    config.Version = sarama.V0_11_0_2 // The version has to be at least V0_10_2_0 to support consumer groups
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    consumerGroup, err := sarama.NewConsumerGroup(addrs, "my-group", config)
    if err != nil {
        log.Fatalf("NewConsumerGroup: %v", err)
    }

    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            if err := consumerGroup.Consume(ctx, []string{topic}, consumer); err != nil {
                log.Panicf("Consume: %v", err)
            }
            if ctx.Err() != nil {
                return
            }
            consumer.ready = make(chan bool)
        }
    }()

    close = func() {
        if err := consumerGroup.Close(); err != nil {
            log.Panicf("Close: %v", err)
        }
    }
    return
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready  chan bool
    handle func([]byte) error
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", message.Value, message.Timestamp, message.Topic)
        if consumer.handle != nil {
            if err := consumer.handle(message.Value); err != nil {
                return fmt.Errorf("handle message %s: %v", message.Value, err)
            }
        }
        session.MarkMessage(message, "")
    }
    return nil
}

Here are a couple of unit tests I've written for it:

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "testing"
    "time"

    "github.com/Shopify/sarama"
    "github.com/stretchr/testify/require"
    "gotest.tools/assert"
)

func TestConsume(t *testing.T) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    producer, err := sarama.NewSyncProducer(addrs, config)
    require.NoError(t, err)

    partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.ByteEncoder([]byte("foobar")),
    })
    require.NoError(t, err)
    t.Logf("Sent message to partition %d with offset %d", partition, offset)

    ctx, cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup

    consumer := &Consumer{ready: make(chan bool)}

    close := consume(ctx, &wg, consumer)

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    time.Sleep(1 * time.Second)

    cancel()
    wg.Wait()
    close()
}

func TestConsumeTwice(t *testing.T) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    producer, err := sarama.NewSyncProducer(addrs, config)
    require.NoError(t, err)

    data1, data2 := "foobar1", "foobar2"

    for _, data := range []string{data1, data2} {
        partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder("foobar"),
            Value: sarama.StringEncoder(data),
        })
        require.NoError(t, err)
        t.Logf("Sent message to partition %d with offset %d", partition, offset)
    }

    ctx, cancel := context.WithCancel(context.Background())
    var wg sync.WaitGroup

    messageReceived := make(chan []byte)
    consumer := &Consumer{
        ready: make(chan bool),
        handle: func(data []byte) error {
            messageReceived <- data
            fmt.Printf("Received message: %s\n", data)
            return nil
        },
    }

    close := consume(ctx, &wg, consumer)

    <-consumer.ready
    log.Println("Sarama consumer up and running!")

    for i := 0; i < 2; i++ {
        data := <-messageReceived
        switch i {
        case 0:
            assert.Equal(t, data1, string(data))
        case 1:
            assert.Equal(t, data2, string(data))
        }
    }

    cancel()
    wg.Wait()
    close()
}

The tests can be run after running Kafka and Zookeeper in a Docker container such as johnnypark/kafka-zookeeper like so:

docker run -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1  -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper

What I'm struggling with is the following: if I comment out the line

        session.MarkMessage(message, "")

the tests still pass. According to https://godoc.org/github.com/Shopify/sarama#ConsumerGroupSession, MarkMessage marks a message as consumed, but how would I test this in a unit test?

1

There are 1 best solutions below

0
On

sarama.ConsumerGroupSession.MarkMessage calls sarama.PartitionOffsetManager.MarkOffset, and in the method comment they said: "Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice."

So in unit tests, MarkMessage does not commit offset fast enough. I faced the same problem and Google brought me here. Sleeping for a second at the end of test functions can be a workaround.