spring-kafka can't work with kafka-cluster

118 Views Asked by At

I've configured 3 kafka cluster and I'm trying to use with spring-kafka. but when I kill a kafka, I'm not able to send other messages to queue.

Kafka version 2.0.0 spring-kafka version 2.0.1

kafka-topics.sh --describe --zookeeper=zoo1:2181 print

KAFKA_SWARM_TEST  PartitionCount:1        ReplicationFactor:2     Configs:
        Topic: KAFKA_SWARM_TEST Partition: 0    Leader: 2       Replicas: 1,2   Isr: 2,1

spring-kafka config

spring.kafka.bootstrap-servers="kafka2:9094,kafka1:9093"

the leader is kafka2.when I kill kafka1. leader still kafka1. but spring-kafka will throw

 Connection to node 1 could not be established.Broker may not be available.
 Discovered group coordinator kafka1:9093

look like the spring-kafka connect just use kafka1;

my java code

    @GetMapping(path = "/send",produces = MediaType.APPLICATION_JSON_VALUE)
    public JsonNode send() throws JsonProcessingException {
        ObjectNode put = JsonNodeFactory.instance.objectNode().put("status", "success");
        String topic = "KAFKA_SWARM_TEST";
        val msg = MessageBuilder
                .withPayload(objectMapper.writeValueAsString(put))
                .setHeader(KafkaHeaders.TOPIC, topic)
                .build();
        kafkaTemplate.send(msg);
        return put;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("KAFKA_SWARM_TEST", 1, (short) 2);
    }

    @KafkaListener(groupId="#{T(java.util.UUID).randomUUID().toString()}",topics = "KAFKA_SWARM_TEST")
    void testGetInfo(String message) throws IOException {
        log.error("getMessage: =====> " + message);
    }

kafka config

version: '3.7'

services:

  zoo1:
    image: wurstmeister/zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888
  zoo2:
    image: wurstmeister/zookeeper
    restart: always
    ports:
      - 2180:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888


  kafka1:
    image: wurstmeister/kafka
    restart: always
    ports:
    - "9093:9093"
    depends_on:
      - zoo1
      - zoo2
    privileged: true
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
      KAFKA_LOG_DIRS: /kafka
      KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
      KAFKA_SSL_KEY_PASSWORD: ksstone430
      KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
      KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
      KAFKA_SSL_CLIENT_AUTH: required
      LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
    volumes:
      - ./kafka_broker_cert:/kafka_broker_cert
      - /var/run/docker.sock:/var/run/docker.sock

  kafka2:
    image: wurstmeister/kafka
    restart: always
    ports:
    - "9094:9093"
    depends_on:
      - zoo1
      - zoo2
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
      KAFKA_LOG_DIRS: /kafka
      KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
      KAFKA_SSL_KEY_PASSWORD: ksstone430
      KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
      KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
      KAFKA_SSL_CLIENT_AUTH: required
      LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
    volumes:
      - ./kafka_broker_cert:/kafka_broker_cert
      - /var/run/docker.sock:/var/run/docker.sock
1

There are 1 best solutions below

1
On

Try checking if the new leader election of the Kafka cluster is working when you kill one of the nodes (e.g. the leader, kafka1)

Also, check if there are other configurations that overrides spring.kafka.bootstrap-servers. There could be a bean that just points to kafka1:9093 as the broker.

However even if the bootstrap-servers property points to kafka1:9093 only, the consumer should find the other nodes of the broker in case of node adjustments.