A simple kafka client to test consumer failure detection does not provide expected behavior. I must have missed something.
Tested with kafka version 0.10.1.0 and consumer with java kafka-client 0.10.1.0.
Following class is lunch two times in parallel. As expected, one client is consuming the topic in the group. But if the active consumer is killed with kill -9, the group is not rebalanced to the other consumer.
public class BasicConsumer {
public BasicConsumer() {
// set up the consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000"); // half a minute timeout
props.put("max.poll.records", "10");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
System.out.printf("Starting Consumer %n");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1"));
LocalDateTime inFewMinutes = LocalDateTime.now().plusMinutes(10);
try {
while (LocalDateTime.now().isBefore(inFewMinutes)) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.printf("%s Poll returned %d records%n", LocalDateTime.now(), records.count());
for (ConsumerRecord<String, String> record : records) {
Map message = new Gson().fromJson(record.value(), Map.class);
Map data = (Map<String, String>) message.get("data");
String msgId = (String) data.get("TRANSFER_ID");
System.out.printf("%s Handling record id %s with offset %s%n", LocalDateTime.now(), msgId, record.offset());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
System.out.printf("Consumer closed cleanly...%n");
}
}
}
The kafka and zookeeper servers are straightforward installation without any modification in configuration.
Thanks in advance for any idea.
late edition : question is resolved
To kill the consumer, i stopped the gradle run command used to launch the java client. This actually doesn't stop the java process...
Killing correctly the java process shows that in the logs, a delay of 30 seconds appears between the active consumer being killed and the rebalancing made with kafka to give the hand to the second consumer. As expected by the session.timeout.ms parameter.