client/metadata got error from broker -1 while fetching metadata: dial tcp: address tcp/9092": unknown port

135 Views Asked by At

there is a problem when I connect to kafka via docker. this is my docker-compose.yaml

version: '3'
networks:
  fixtures_test:
    external: true

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks:
      - fixtures_test

  zoonavigator:
    image: elkozmon/zoonavigator:latest
    ports:
      - 2182:2182
    environment:
      ZOO_HOSTS: 'zookeeper:2181'
      HTTP_PORT: 2182
    links:
      - zookeeper
    networks:
      - fixtures_test

  kafka1:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka1
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    networks:
      - fixtures_test

  edgenode-peer0-org1:
    image: fabric-edgenode:latest
    ports:
      - "7080:8080"
      - "7083:8083"
    volumes:
      - ./cfg/org1conf.yaml:/conf/config.yaml
      - ./fixtures/channel-artifacts:/fixtures/channel-artifacts
      - ./fixtures/crypto-config:/fixtures/crypto-config
      - ./kafka_crypto:/kafka_crypto
    environment:
      - KAFKA_ADDR="kafka1:9092"
      - COUDB_ADDR="http://admin:123456@couchdb0:5984"
      - PEER_NODE_NAME="peer0.org1.example.com:7051"
      - ORG_ID = "org1"
      - KEY_PATH = "/fixtures/crypto-config/peerOrganizations/org1.example.com/peers/peer0.org1.example.com/msp/keystore/priv_sk"
      - CENTER_ADDR = "center_kafka:9091"
    depends_on:
      - kafka.peer0.org1.example.com
      - couchdb0
    networks:
      - fixtures_test

In edgenode-peer0-org1, I use samara to create a client to connect the kafka broker.

main.go

var peertopics = []string{"register", "upload", "filereq", "KeyUpload", "ReceiveKeyUpload", "ReceiveKeyReq", "DataForwarding", "ReceiveFileRequestFromCenter"}

func main() {

    var node = NodeUtils.Nodestructure{
        KafkaAddr:      os.Getenv("KAFKA_ADDR"),
        Couchdb_addr: os.Getenv("COUCHDB_ADDR"),
        PeerNodeName: os.Getenv("PEER_NODE_NAME"),
        OrgID:        os.Getenv("ORG_ID"),
        KeyPath:      os.Getenv("KEY_PATH"),
        CenterAddr:   os.Getenv("CENTER_ADDR"),
        ConfigPath:   "/conf/config.yaml",
    }
    //init node
    node.InitPeerNode(peertopics)
        //start websocket
    go NodeUtils.InitWebsocket()

    //test- start gin router
    r := gin.Default()
    r.POST("/register", NodeUtils.Register)       //http://10.0.0.144:8083/register
    r.POST("/upload", NodeUtils.Upload)           //http://10.0.0.144:8083/upload
    r.POST("/requestfile", NodeUtils.Filerequest) //http://10.0.0.144:8083/requestfile
    r.Run("0.0.0.0:8083")                         // 0.0.0.0:8083 
    fmt.Println("start")
}

initpeernode()

func (nodestru Nodestructure) InitPeerNode(topics []string) {

    //initpeerInKafka
    // clients.InitPeerSdk(nodestru.PeerNodeName, nodestru.OrgID, nodestru.ConfigPath)

    //create db in couchdb
    if err := nodestru.Create_cipherkey_info(); err != nil {
        fmt.Println("create cipherkey_info db error:", err)
    }
    if err := nodestru.Create_ciphertext_info(); err != nil {
        fmt.Println("create ciphertext_info db error:", err)
    }
    var wg sync.WaitGroup
    wg.Add(9)
    //create consumer
    consumer1, err := clients.InitConsumer(nodestru.KafkaAddr)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
    }
    fmt.Println(nodestru.KafkaAddr, "init peer-consumer1 begin")
    go consumeRegister(consumer1, nodestru, &wg)
    go consumeUpload(consumer1, nodestru, &wg)
    go consumeFileReq(consumer1, nodestru, &wg)
    //...

InitConsumer()

func InitConsumer(kafka_addr string) (sarama.Consumer, error) {
    sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
    consumer, err := sarama.NewConsumer([]string{kafka_addr}, nil)
    if err != nil {
        return nil, fmt.Errorf("init kafka consumer error: %v", err)
    }

    return consumer, nil
}

and I get an error

fabric_in_center_edge-edgenode-main-edgenode-peer0-org1-1  | [sarama] 2023/09/20 08:54:36 client/metadata fetching metadata for all topics from broker "kafka1:9092"
fabric_in_center_edge-edgenode-main-edgenode-peer0-org1-1  | [sarama] 2023/09/20 08:54:36 Failed to connect to broker "kafka1:9092": dial tcp: address tcp/9092": unknown port
fabric_in_center_edge-edgenode-main-edgenode-peer0-org1-1  | [sarama] 2023/09/20 08:54:36 client/metadata got error from broker -1 while fetching metadata: dial tcp: address tcp/9092": unknown port
fabric_in_center_edge-edgenode-main-edgenode-peer0-org1-1  | [sarama] 2023/09/20 08:54:36 client/metadata no available broker to send metadata request to

I was wondering Why is my docker domain name translated to tcp/9092 and how to fix it? thanks.

I checked broker info in zookeeper which is {"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka1:9092"],"jmx_port":-1,"port":9092,"host":"kafka1","version":5,"timestamp":"1695198568231"}

I went inside docker and accessed the broker using curl.

root@da8a091e9507:/# curl -kv kafka1:9092
*   Trying 192.168.240.46:9092...
* Connected to kafka1 (192.168.240.46) port 9092 (#0)
> GET / HTTP/1.1
> Host: kafka1:9092
> User-Agent: curl/7.74.0
> Accept: */*
> 
* Empty reply from server
* Connection #0 to host kafka1 left intact
curl: (52) Empty reply from server
0

There are 0 best solutions below