I'm experimenting with a Kafka clusters (3 nodes) and I was intending to run some tests around redundancy and availability (stopping nodes in cluster, etc) with a simple java app using the following kafka client dependency: -
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
I'd configured a replication factor of 3 to ensure topics are replicated across all nodes and I'm using only 1 partition for the topic. I'm struggling to understand some behaviour I'm seeing on this sample code when specifically seeking to an offset (with one node offline):-
String topic = "test-topic";
TopicPartition partition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Collections.singletonList(partition);
while (true) {
Consumer<String, String> consumer = createConsumer();
consumer.assign(partitions);
consumer.seek(partition, 0);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
if (records.isEmpty())
System.out.println("No Records Found");
else
System.out.println("Records Found: " + records.count());
consumer.close();
Thread.sleep(2000);
}
This code will on occasion return "No Records Found" when one of the nodes in the cluster is offline:-
No Records Found Records Found: 1 No Records Found Records Found: 1 Records Found: 1 Records Found: 1 No Records Found Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 No Records Found Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 No Records Found
You'll notice that I'm creating the consumer each time inside the while loop. This is to simulate different consumers coming in and connecting as each consumer has a different consumer group id. Moving the consumer creation outside of the while loop (and removing consumer.close()) gives mostly expected results, i.e. all logs show "Records Found: 1". However, "sometimes" the very first poll will return no records, with all remaining showing 1 record found:-
String topic = "test-topic";
TopicPartition partition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Collections.singletonList(partition);
Consumer<String, String> consumer = createConsumer();
while (true) {
consumer.assign(partitions);
consumer.seek(partition, 0);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
if (records.isEmpty())
System.out.println("No Records Found");
else
System.out.println("Records Found: " + records.count());
Thread.sleep(2000);
}
createConsumer code is defined as follows for reference: -
public static Consumer<String, String> createConsumer() {
Properties config = new Properties();
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-" + UUID.randomUUID().toString());
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092, node-2:9092, node-3:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" );
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
Consumer<String, String> consumer = new KafkaConsumer<String, String>(config);
return consumer;
}
I'd like to understand this behaviour to be able to reliably run my availability tests.
I am also stuck with this problem, Finally solved it like this: