I've configured 3 kafka cluster and I'm trying to use with spring-kafka. but when I kill a kafka, I'm not able to send other messages to queue.
Kafka version 2.0.0 spring-kafka version 2.0.1
kafka-topics.sh --describe --zookeeper=zoo1:2181 print
KAFKA_SWARM_TEST PartitionCount:1 ReplicationFactor:2 Configs:
Topic: KAFKA_SWARM_TEST Partition: 0 Leader: 2 Replicas: 1,2 Isr: 2,1
spring-kafka config
spring.kafka.bootstrap-servers="kafka2:9094,kafka1:9093"
the leader is kafka2.when I kill kafka1. leader still kafka1. but spring-kafka will throw
Connection to node 1 could not be established.Broker may not be available.
Discovered group coordinator kafka1:9093
look like the spring-kafka connect just use kafka1;
my java code
@GetMapping(path = "/send",produces = MediaType.APPLICATION_JSON_VALUE)
public JsonNode send() throws JsonProcessingException {
ObjectNode put = JsonNodeFactory.instance.objectNode().put("status", "success");
String topic = "KAFKA_SWARM_TEST";
val msg = MessageBuilder
.withPayload(objectMapper.writeValueAsString(put))
.setHeader(KafkaHeaders.TOPIC, topic)
.build();
kafkaTemplate.send(msg);
return put;
}
@Bean
public NewTopic topic() {
return new NewTopic("KAFKA_SWARM_TEST", 1, (short) 2);
}
@KafkaListener(groupId="#{T(java.util.UUID).randomUUID().toString()}",topics = "KAFKA_SWARM_TEST")
void testGetInfo(String message) throws IOException {
log.error("getMessage: =====> " + message);
}
kafka config
version: '3.7'
services:
zoo1:
image: wurstmeister/zookeeper
restart: always
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888
zoo2:
image: wurstmeister/zookeeper
restart: always
ports:
- 2180:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888
kafka1:
image: wurstmeister/kafka
restart: always
ports:
- "9093:9093"
depends_on:
- zoo1
- zoo2
privileged: true
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
KAFKA_LOG_DIRS: /kafka
KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
KAFKA_SSL_KEY_PASSWORD: ksstone430
KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
KAFKA_SSL_CLIENT_AUTH: required
LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
volumes:
- ./kafka_broker_cert:/kafka_broker_cert
- /var/run/docker.sock:/var/run/docker.sock
kafka2:
image: wurstmeister/kafka
restart: always
ports:
- "9094:9093"
depends_on:
- zoo1
- zoo2
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
KAFKA_LOG_DIRS: /kafka
KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
KAFKA_SSL_KEY_PASSWORD: ksstone430
KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
KAFKA_SSL_CLIENT_AUTH: required
LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
volumes:
- ./kafka_broker_cert:/kafka_broker_cert
- /var/run/docker.sock:/var/run/docker.sock
Try checking if the new leader election of the Kafka cluster is working when you kill one of the nodes (e.g. the leader, kafka1)
Also, check if there are other configurations that overrides
spring.kafka.bootstrap-servers
. There could be a bean that just points tokafka1:9093
as the broker.However even if the
bootstrap-servers
property points tokafka1:9093
only, the consumer should find the other nodes of the broker in case of node adjustments.