I'm trying to adapt code from the consumer group example for github.com/Shopify/sarama
, and am struggling to add a unit test which tests the functionality of session.MarkMessage()
in the ConsumeClaim
method (https://github.com/Shopify/sarama/blob/5466b37850a38f4ed6d04b94c6f058bd75032c2a/examples/consumergroup/main.go#L160).
Here is my adapted code with a consume()
function:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
var (
addrs = []string{"localhost:9092"}
topic = "my-topic"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
consumer := &Consumer{ready: make(chan bool)}
close := consume(ctx, &wg, consumer)
defer close()
<-consumer.ready
log.Println("Sarama consumer up and running!")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
}
func consume(ctx context.Context, wg *sync.WaitGroup, consumer *Consumer) (close func()) {
config := sarama.NewConfig()
config.Version = sarama.V0_11_0_2 // The version has to be at least V0_10_2_0 to support consumer groups
config.Consumer.Offsets.Initial = sarama.OffsetOldest
consumerGroup, err := sarama.NewConsumerGroup(addrs, "my-group", config)
if err != nil {
log.Fatalf("NewConsumerGroup: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := consumerGroup.Consume(ctx, []string{topic}, consumer); err != nil {
log.Panicf("Consume: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
close = func() {
if err := consumerGroup.Close(); err != nil {
log.Panicf("Close: %v", err)
}
}
return
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
handle func([]byte) error
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", message.Value, message.Timestamp, message.Topic)
if consumer.handle != nil {
if err := consumer.handle(message.Value); err != nil {
return fmt.Errorf("handle message %s: %v", message.Value, err)
}
}
session.MarkMessage(message, "")
}
return nil
}
Here are a couple of unit tests I've written for it:
package main
import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
)
func TestConsume(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(addrs, config)
require.NoError(t, err)
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder([]byte("foobar")),
})
require.NoError(t, err)
t.Logf("Sent message to partition %d with offset %d", partition, offset)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
consumer := &Consumer{ready: make(chan bool)}
close := consume(ctx, &wg, consumer)
<-consumer.ready
log.Println("Sarama consumer up and running!")
time.Sleep(1 * time.Second)
cancel()
wg.Wait()
close()
}
func TestConsumeTwice(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(addrs, config)
require.NoError(t, err)
data1, data2 := "foobar1", "foobar2"
for _, data := range []string{data1, data2} {
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("foobar"),
Value: sarama.StringEncoder(data),
})
require.NoError(t, err)
t.Logf("Sent message to partition %d with offset %d", partition, offset)
}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
messageReceived := make(chan []byte)
consumer := &Consumer{
ready: make(chan bool),
handle: func(data []byte) error {
messageReceived <- data
fmt.Printf("Received message: %s\n", data)
return nil
},
}
close := consume(ctx, &wg, consumer)
<-consumer.ready
log.Println("Sarama consumer up and running!")
for i := 0; i < 2; i++ {
data := <-messageReceived
switch i {
case 0:
assert.Equal(t, data1, string(data))
case 1:
assert.Equal(t, data2, string(data))
}
}
cancel()
wg.Wait()
close()
}
The tests can be run after running Kafka and Zookeeper in a Docker container such as johnnypark/kafka-zookeeper
like so:
docker run -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper
What I'm struggling with is the following: if I comment out the line
session.MarkMessage(message, "")
the tests still pass. According to https://godoc.org/github.com/Shopify/sarama#ConsumerGroupSession, MarkMessage
marks a message as consumed, but how would I test this in a unit test?
sarama.ConsumerGroupSession.MarkMessage
callssarama.PartitionOffsetManager.MarkOffset
, and in the method comment they said: "Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice."So in unit tests,
MarkMessage
does not commit offset fast enough. I faced the same problem and Google brought me here. Sleeping for a second at the end of test functions can be a workaround.