cannot create kafka producer with go

29 Views Asked by At

I'm trying to create a kafka producer locally. I'm spinning up kafka through docker-compose up like this:

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    hostname: zookeeper
    tmpfs: "/datalog"
  kafka:
    image: wurstmeister/kafka
    command: [ start-kafka.sh ]
    ports:
      - 9092:9092
    environment:
      KAFKA_CREATE_TOPICS: "Kafka-audit"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper

However when I try to connect to it through my golang code like so:

func NewAuditKafkaConfig(kafkaConf *config.KafkaConfig) (*AuditKafkaConfig, error) {
    kConfig := &Config{}
    kConfig.SetBrokerList(kafkaConf.BootstrapServers)
    kConfig.SetConnection(kafkaConf.BootstrapServers[0])
    kConfig.SetGroup(kafkaConf.ConsumerGroupID)
    kConfig.SetTopic(kafkaConf.Topic)
    err := setKafkaTLS(kConfig)
    if err != nil {
        log.Error().Msgf("Failed to set kafka tls %s", err.Error())
    }
    producer, err := newProducer(kConfig)
    if err != nil {
        log.Error().Msgf("Failed to create producer instance: %s", err.Error())
    }
    config := &AuditKafkaConfig{
        Producer: producer,
        Config:   kConfig,
    }
    return config, err
}

func newProducer(kConfig *Config) (*Producer, error) {
    producer, err := NewProducer(kConfig)
    return producer, err
}

func setKafkaTLS(kc *Config) error {
    kc.TLS = true
    if config.AppConfig.KafkaConfig.DisableTLS {
        kc.TLS = false
    }
    c := &tls.Config{}
    pemCertBlock, _ := base64.StdEncoding.DecodeString(os.Getenv("KAFKA_CLIENT_CERT"))
    pemKeyBlock, _ := base64.StdEncoding.DecodeString(os.Getenv("KAFKA_CLIENT_KEY"))
    rootCABlock, _ := base64.StdEncoding.DecodeString(os.Getenv("KAFKA_SERVER_CERT"))
    if len(pemCertBlock) > 0 && len(pemKeyBlock) > 0 {
        cert, err := tls.X509KeyPair(pemCertBlock, pemKeyBlock)
        if err != nil {
            return err
        }
        c.Certificates = []tls.Certificate{cert}
    }

    if len(rootCABlock) > 0 {
        certPool := x509.NewCertPool()
        if ok := certPool.AppendCertsFromPEM(rootCABlock); !ok {
            return fmt.Errorf("cert error")
        }
        c.RootCAs = certPool
    }
    kc.SetDefaults(c)
    return nil
}

func NewProducer(c *Config) (*Producer, error) {
    pro := new(Producer)
    var err error
    pro.ErrChannel = make(chan error, 5)
    pro.async, err = sarama.NewAsyncProducer(c.brokerList, c.sConfig)
    if err != nil {
        err = fmt.Errorf("failed to create producer: %s", err)
    }
    return pro, err
}

However when I run this I get this error message:

"Failed to create producer instance: failed to create async producer: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"

Does anyone have any suggestions as to resolve this?

0

There are 0 best solutions below