Uber-go/zap and kafka-go race condition

1.1k Views Asked by At

I'm creating a custom logger where we can log to std out and std err, but also adding the possibility to log to kafka (the code example is here: https://github.com/roppa/kafka-go). We have multiple topics, so we need multiple loggers, but when we use more than one we get some weird things happening. When both kafka-go settings are async, I get no consumer messages, when one is async and the other is synchronous we get something like this:

//consumer topica
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:04.023Z","msg":"topic-a log 1","UID":"abc123","ns":"test-service"}

{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:05.078Z","msg":"topic-a log 2","UID":"abc123","ns":"test-service"}

{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:06.085Z","msg":"topic-a log 3","UID":"abc123","ns":"test-service"}

//consumer topicb
2020-12-09T15:31:06.085Z    INFO    topic-a log 3   {"UID": "abc123", "ns": "test-service"}
2","UID":"abc123","ns":"test-service"}

Changing sync results in really different effects. I'm pretty new to Go.

This is main.go:

package main

import (
    "context"
    "kafka-log/logger"
)

func main() {
    loggerA := logger.Init("test-service", "localhost:9092", "topica", false, false)
    loggerB := logger.Init("test-service", "localhost:9092", "topicb", false, true)

    ctx := context.Background()
    ctx2 := context.WithValue(ctx, logger.UID, "abc123")

    loggerA.CInfo(ctx2, "topic-a log 1")
    loggerB.CInfo(ctx2, "topic-b log 1")

    loggerA.CInfo(ctx2, "topic-a log 2")
    loggerB.CInfo(ctx2, "topic-b log 2")

    loggerA.CInfo(ctx2, "topic-a log 3")
    loggerB.CInfo(ctx2, "topic-b log 3")
}

This is the logger/logger.go:

package logger

import (
    "context"
    "os"

    "github.com/segmentio/kafka-go"
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

type (
    key string

    // Logger type embeds zap and also contains the current system name (namespace, Ns)
    Logger struct {
        *zap.Logger
        Ns string
    }

    // KConfig type for creating a new Kafka logger. Takes a Namespace,
    // Broker (eg 'localhost:9092'), Topic (eg 'topic-a')
    KConfig struct {
        Namespace string
        Broker    string
        Topic     string
        Async     bool
    }

    producerInterface interface {
        WriteMessages(ctx context.Context, msgs ...kafka.Message) error
    }

    // KafkaProducer contains a kafka.Producer and Kafka topic
    KafkaProducer struct {
        Producer producerInterface
        Topic    string
    }
)

const (
    // UID - uniquely request identifier
    UID key = "request_id"
)

var customConfig = zapcore.EncoderConfig{
    TimeKey:        "timeStamp",
    LevelKey:       "level",
    NameKey:        "logger",
    CallerKey:      "caller",
    FunctionKey:    zapcore.OmitKey,
    MessageKey:     "msg",
    StacktraceKey:  "stacktrace",
    LineEnding:     zapcore.DefaultLineEnding,
    EncodeLevel:    zapcore.CapitalColorLevelEncoder,
    EncodeTime:     zapcore.ISO8601TimeEncoder,
    EncodeDuration: zapcore.SecondsDurationEncoder,
}

// CInfo this function takes a context as first parameter, extracts specific fields as well as namespace, and calls zap Info
func (l *Logger) CInfo(ctx context.Context, msg string, fields ...zap.Field) {
    l.Info(msg, consolidate(ctx, l.Ns, fields...)...)
}

func consolidate(ctx context.Context, namespace string, fields ...zap.Field) []zap.Field {
    return append(append(ctxToZapFields(ctx), fields...), zap.String("ns", namespace))
}

// See advanced config example: https://github.com/uber-go/zap/blob/master/example_test.go#L105
var lowPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
    return lvl < zapcore.ErrorLevel && lvl > zapcore.DebugLevel
})
var debugPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
    return lvl < zapcore.ErrorLevel
})
var kafkaPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
    return lvl > zapcore.DebugLevel
})

// Init creates a new instance of a logger. Namespace is the name of the module using the logger. broker and topic are Kafa specific,
// if either of these is not set a default console logger is created.
func Init(namespace, broker, topic string, debug, async bool) *Logger {
    var kp *KafkaProducer = nil
    if broker != "" && topic != "" {
        kp = NewKafkaProducer(&KConfig{
        Broker: broker,
        Topic:  topic,
        Async:  async,
    })
    }
    logger := getLogger(debug, kp)
    // logger.Info("initiated logger", zap.String("ns", namespace), zap.Bool("kafka", kp != nil), zap.Bool("debug", debug))
    return &Logger{logger, namespace}
}

func getLogger(debug bool, kp *KafkaProducer) *zap.Logger {
    // cores are logger interfaces
    var cores []zapcore.Core

    // optimise message for console output (human readable)
    consoleEncoder := zapcore.NewConsoleEncoder(customConfig)
    // Lock wraps a WriteSyncer in a mutex to make it safe for concurrent use.
    // See https://godoc.org/go.uber.org/zap/zapcore
    cores = append(cores,
        zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), getPriority(debug)),
        zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stderr), zap.ErrorLevel),
    )

    if kp != nil {
        cores = append(cores, zapcore.NewCore(zapcore.NewJSONEncoder(customConfig), zapcore.Lock(zapcore.AddSync(kp)), kafkaPriority))
    }

    // join inputs, encoders, level-handling functions into cores, then "tee" together
    logger := zap.New(zapcore.NewTee(cores...))
    defer logger.Sync()
    return logger
}

func getPriority(debug bool) zap.LevelEnablerFunc {
    if debug {
        return debugPriority
    }
    return lowPriority
}

func ctxToZapFields(ctx context.Context) []zap.Field {
    reqID, _ := ctx.Value(UID).(string)
    return []zap.Field{
        zap.String("UID", reqID),
    }
}

// NewKafkaProducer instantiates a kafka.Producer, saves topic, and returns a KafkaProducer
func NewKafkaProducer(c *KConfig) *KafkaProducer {
    return &KafkaProducer{
        Producer: kafka.NewWriter(kafka.WriterConfig{
            Brokers:      []string{c.Broker},
            Topic:        c.Topic,
            Balancer:     &kafka.Hash{},
            Async:        c.Async,
            RequiredAcks: -1, // -1 = all
        }),
        Topic: c.Topic,
    }
}

// Write takes a message as a byte slice, wraps in a kafka.message and calls kafka Produce
func (kp *KafkaProducer) Write(msg []byte) (int, error) {
    return len(msg), kp.Producer.WriteMessages(context.Background(), kafka.Message{
        Key:   []byte(""),
        Value: msg,
    })
}

I'm using these for consumers:

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topica

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topicb

And this is my kafka docker-compose:

version: '3.8'

services:
  
  zookeeper:
    image: confluentinc/cp-zookeeper
    networks:
      - kafka-net
    container_name: zookeeper
    environment:
        ZOOKEEPER_CLIENT_PORT: 2181
    ports:
        - 2181:2181

  kafka:
    image: confluentinc/cp-kafka
    networks:
      - kafka-net
    container_name: kafka
    environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        ALLOW_PLAINTEXT_LISTENER: "yes"
        KAFKA_LISTENERS-INTERNAL: //kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    ports:
        - 9092:9092
        - 29092:29092
    depends_on:
        - zookeeper
    restart: on-failure

networks:
  kafka-net:
    driver: bridge

1

There are 1 best solutions below

0
On

I imagine your program is exiting before the async messages have time to send (Although if I'm reading your example correctly it is strange to me that "topic-a log 3" is the only log message that makes it). Unlike something like javascript, Go will not wait for all threads/goroutines to terminate before exiting.

Would also highlight the docstring for the Async config for kafka-go:

// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.

On the solution front: I think you can solve this by calling Close on the writer:

https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.Close

Close flushes pending writes, and waits for all writes to complete before returning. Calling Close also prevents new writes from being submitted to the writer, further calls to WriteMessages and the like will fail with io.ErrClosedPipe.

You would need to surface the underlying KafkaProducer.Producer and call KafkaProducer.Producer.Close before exiting.

There might be cleverer ways to structure the cleanup, but I can't seem to find an easier way to flush the pending messages than just calling Close on the writer.