Kafka Cluster sometimes returns no records during seek and poll

1k Views Asked by At

I'm experimenting with a Kafka clusters (3 nodes) and I was intending to run some tests around redundancy and availability (stopping nodes in cluster, etc) with a simple java app using the following kafka client dependency: -

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>

I'd configured a replication factor of 3 to ensure topics are replicated across all nodes and I'm using only 1 partition for the topic. I'm struggling to understand some behaviour I'm seeing on this sample code when specifically seeking to an offset (with one node offline):-

    String topic = "test-topic";
    TopicPartition partition = new TopicPartition(topic, 0);
    List<TopicPartition> partitions = Collections.singletonList(partition);
    
    while (true) {                  

        Consumer<String, String> consumer = createConsumer();           
        consumer.assign(partitions);
        consumer.seek(partition, 0);        
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
        
        if (records.isEmpty())
            System.out.println("No Records Found");
        else
            System.out.println("Records Found: " + records.count());
        
        consumer.close();   
        Thread.sleep(2000);
    }   

This code will on occasion return "No Records Found" when one of the nodes in the cluster is offline:-

No Records Found Records Found: 1 No Records Found Records Found: 1 Records Found: 1 Records Found: 1 No Records Found Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 No Records Found Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 Records Found: 1 No Records Found

You'll notice that I'm creating the consumer each time inside the while loop. This is to simulate different consumers coming in and connecting as each consumer has a different consumer group id. Moving the consumer creation outside of the while loop (and removing consumer.close()) gives mostly expected results, i.e. all logs show "Records Found: 1". However, "sometimes" the very first poll will return no records, with all remaining showing 1 record found:-

    String topic = "test-topic";
    TopicPartition partition = new TopicPartition(topic, 0);
    List<TopicPartition> partitions = Collections.singletonList(partition);
    Consumer<String, String> consumer = createConsumer();       
    
    while (true) {                  
        
        consumer.assign(partitions);
        consumer.seek(partition, 0);        
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
        
        if (records.isEmpty())
            System.out.println("No Records Found");
        else
            System.out.println("Records Found: " + records.count());
                
        Thread.sleep(2000);
    }   

createConsumer code is defined as follows for reference: -

public static Consumer<String, String> createConsumer() {
    
    Properties config = new Properties();
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-" + UUID.randomUUID().toString());
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node-1:9092, node-2:9092, node-3:9092");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" );
    config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(config);
    return consumer;
}

I'd like to understand this behaviour to be able to reliably run my availability tests.

1

There are 1 best solutions below

0
On

I am also stuck with this problem, Finally solved it like this:

public ConsumerRecord<String, String> seekAndPoll(String topic, int partition, long offset) {
    TopicPartition tp = new TopicPartition(topic, partition);
    consumer.assign(Collections.singleton(tp));
    System.out.println("assignment:" + consumer.assignment()); // 这里是有分配到分区的
    
    // endOffset: the offset of the last successfully replicated message plus one
    // if there has 5 messages, valid offsets are [0,1,2,3,4], endOffset is 4+1=5
    Long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp); 
    if (offset < 0 || offset >= endOffset) {
        System.out.println("offset is illegal");
        return null;
    } else {
        consumer.seek(tp, offset);
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100))
        if(records.isEmpty()){
            System.out.println("Not Found");
            return null;
        } else {
            System.out.println("Found");
            return records.iterator().next();
        }
    }
}