I'm trying to create a kafka producer locally. I'm spinning up kafka through docker-compose up like this:
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
hostname: zookeeper
tmpfs: "/datalog"
kafka:
image: wurstmeister/kafka
command: [ start-kafka.sh ]
ports:
- 9092:9092
environment:
KAFKA_CREATE_TOPICS: "Kafka-audit"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
However when I try to connect to it through my golang code like so:
func NewAuditKafkaConfig(kafkaConf *config.KafkaConfig) (*AuditKafkaConfig, error) {
kConfig := &Config{}
kConfig.SetBrokerList(kafkaConf.BootstrapServers)
kConfig.SetConnection(kafkaConf.BootstrapServers[0])
kConfig.SetGroup(kafkaConf.ConsumerGroupID)
kConfig.SetTopic(kafkaConf.Topic)
err := setKafkaTLS(kConfig)
if err != nil {
log.Error().Msgf("Failed to set kafka tls %s", err.Error())
}
producer, err := newProducer(kConfig)
if err != nil {
log.Error().Msgf("Failed to create producer instance: %s", err.Error())
}
config := &AuditKafkaConfig{
Producer: producer,
Config: kConfig,
}
return config, err
}
func newProducer(kConfig *Config) (*Producer, error) {
producer, err := NewProducer(kConfig)
return producer, err
}
func setKafkaTLS(kc *Config) error {
kc.TLS = true
if config.AppConfig.KafkaConfig.DisableTLS {
kc.TLS = false
}
c := &tls.Config{}
pemCertBlock, _ := base64.StdEncoding.DecodeString(os.Getenv("KAFKA_CLIENT_CERT"))
pemKeyBlock, _ := base64.StdEncoding.DecodeString(os.Getenv("KAFKA_CLIENT_KEY"))
rootCABlock, _ := base64.StdEncoding.DecodeString(os.Getenv("KAFKA_SERVER_CERT"))
if len(pemCertBlock) > 0 && len(pemKeyBlock) > 0 {
cert, err := tls.X509KeyPair(pemCertBlock, pemKeyBlock)
if err != nil {
return err
}
c.Certificates = []tls.Certificate{cert}
}
if len(rootCABlock) > 0 {
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(rootCABlock); !ok {
return fmt.Errorf("cert error")
}
c.RootCAs = certPool
}
kc.SetDefaults(c)
return nil
}
func NewProducer(c *Config) (*Producer, error) {
pro := new(Producer)
var err error
pro.ErrChannel = make(chan error, 5)
pro.async, err = sarama.NewAsyncProducer(c.brokerList, c.sConfig)
if err != nil {
err = fmt.Errorf("failed to create producer: %s", err)
}
return pro, err
}
However when I run this I get this error message:
"Failed to create producer instance: failed to create async producer: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
Does anyone have any suggestions as to resolve this?