I have Kafka commit policy set to latest and missing first few messages. If I give a sleep of 20 seconds before starting to send the messages to the input topic, everything is working as desired. I am not sure if the problem is with consumer taking long time for partition rebalancing. Is there a way to know if the consumer is ready before starting to poll ?
How to check if Kafka Consumer is ready
16.3k Views Asked by Nagireddy Hanisha AtThere are 7 best solutions below

You can do the following:
I have a test that reads data from kafka topic.
So you can't use KafkaConsumer in multithread environment, but you can pass parameter "AtomicReference assignment", update it in consumer-thread, and read it in another thread.
For example, snipped of working code in project for testing:
private void readAvro(String readFromKafka,
AtomicBoolean needStop,
List<Event> events,
String bootstrapServers,
int readTimeout) {
// print the topic name
AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();
long startTime = System.currentTimeMillis();
long maxWaitingTime = 30_000;
for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
+ assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
if (assignments.size() > 0) {
break;
}
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
needStop.set(true);
break;
}
}
System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}
private void readAvro(String bootstrapServers,
String readFromKafka,
AtomicBoolean needStop,
List<Event> events,
int readTimeout,
AtomicReference<Set<TopicPartition>> assignment) {
KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
System.out.println("Subscribed to topic: " + readFromKafka);
consumer.subscribe(Collections.singletonList(readFromKafka));
long started = System.currentTimeMillis();
while (!needStop.get()) {
assignment.set(consumer.assignment());
ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
events.addAll(CommonUtils4Tst.readEvents(records));
if (readTimeout == -1) {
if (events.size() > 0) {
break;
}
} else if (System.currentTimeMillis() - started > readTimeout) {
break;
}
}
needStop.set(true);
synchronized (MainTest.class) {
MainTest.class.notifyAll();
}
consumer.close();
}
P.S.
needStop - global flag, to stop all running thread if any in case of failure of success
events - list of object, that i want to check
readTimeout - how much time we will wait until read all data, if readTimeout == -1, then stop when we read anything

Thanks to Alexey (I have also voted up), I seemed to have resolved my issue essentially following the same idea.
Just want to share my experience... in our case we using Kafka in request & response way, somewhat like RPC. Request is being sent on one topic and then waiting for response on another topic. Running into a similar issue i.e. missing out first response.
I have tried ... KafkaConsumer.assignment();
repeatedly (with Thread.sleep(100);
) but doesn't seem to help. Adding a KafkaConsumer.poll(50);
seems to have primed the consumer (group) and receiving the first response too. Tested few times and it consistently working now.
BTW, testing requires stopping application & deleting Kafka topics and, for a good measure, restarted Kafka too.
PS: Just calling poll(50);
without assignment();
fetching logic, like Alexey mentioned, may not guarantee that consumer (group) is ready.

If your policy is set to latest - which takes effect if there are no previously committed offsets - but you have no previously committed offsets, then you should not worry about 'missing' messages, because you're telling Kafka not to care about messages that were sent 'previously' to your consumers being ready.
If you care about 'previous' messages, you should set the policy to earliest.
In any case, whatever the policy, the behaviour you are seeing is transient, i.e. once committed offsets are saved in Kafka, on every restart the consumers will pick up where they left previoulsy

You can use
consumer.assignment()
, it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.If you are using spring-kafka project, you can include spring-kafka-test dependancy and use below method to wait for topic assignment , but you need to have container.
ContainerTestUtils.waitForAssignment(Object container, int partitions);

You can modify an AlwaysSeekToEndListener (listens only to new messages) to include a callback:
public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
private final Consumer<K, V> consumer;
private Runnable callback;
public AlwaysSeekToEndListener(Consumer<K, V> consumer) {
this.consumer = consumer;
}
public AlwaysSeekToEndListener(Consumer<K, V> consumer, Runnable callback) {
this.consumer = consumer;
this.callback = callback;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToEnd(partitions);
if (callback != null) {
callback.run();
}
}
}
and subscribe with a latch callback:
CountDownLatch initLatch = new CountDownLatch(1);
consumer.subscribe(singletonList(topic), new AlwaysSeekToEndListener<>(consumer, () -> initLatch.countDown()));
initLatch.await(); // blocks until consumer is ready and listening
then proceed to start your producer.

I've faced with similar problem during testing with EmbeddedKafka.
Disclaimer. My approach may not looking like "kafka-way" yet it keeps job done with respect some tradeoffs. And of course it shouldn't be used anywhere but tests.
In general the test is consists of next steps:
- Create consumer
- Post some message to topic
- Expect the only specific message was consumed
So I'm looking for auto.offset.reset=latest
semantic with guarantees that the assigned topic is ready to be polled. At the end I decided to use special message to mark that consumer is ready:
public class ConsumerHelper {
public static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker, Set<String> topics) {
var consumer = buildConsumer(broker);
if (!CollectionUtils.isEmpty(topics)) {
var producer = buildUtilProducer(...);
var key = "util-message-key" + UUID.randomUUID(); //key must be unique for every method call
topics.forEach(
topic -> producer.send(new ProducerRecord<>(topic, key, new Object()))
);
var uncheckedTopics = new HashSet<>(topics);
consumer.subscribe(topics);
do {
consumer.poll(Duration.ofMillis()).forEach(record -> {
if (key.equals(record.getKey())) {
uncheckedTopics.remove(record.topic())
}
});
consumer.commitSync()
} while (!uncheckedTopics.isEmpty() /* you may add some timeout check logic here if needed */)
}
return consumer;
}
/**
* consumer builder method, e.g. with KafkaTestUtils
*
* @implSpec consumer group id must be unique, {@code auto.offset.reset} must be setted to {@code earliest}
*/
private static KafkaConsumer<String, Object> buildConsumer(EmbeddedKafkaBroker broker) {
var randomGroupId = "group-id-" + UUID.randomUUID(); //consumer group id must be unique
var consumerProps = KafkaTestUtils.consumerProps(randomGroupId, "true", broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //this is important
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserilizer.class);
//some extra consumer props if needed
//...
//
return new KafkaConsumer<>(consumerProps);
}
/**
* util producer builder method, e.g. with KafkaTestUtils
*/
private static KafkaConsumer<String, Object> buildUtilProducer() {
//...
}
}
After all the "KafkaConsumer" built with the public method is ready to consume new messages immediately.
Obvious restriction: tests should not be run concurrently.
I needed to know if a kafka consumer was ready before doing some testing, so i tried with consumer.assignment(), but it only returned the set of partitions assigned, but there was a problem, with this i cannot see if this partitions assigned to the group had offset setted, so later when i tried to use the consumer it didn´t have offset setted correctly.
The solutions was to use committed(), this will give you the last commited offsets of the given partitions that you put in the arguments.
So you can do something like:
consumer.committed(consumer.assignment())
If there is no partitions assigned yet it will return:
If there is partitions assigned, but no offset yet:
But if there is partitions and offset:
With this information you can use something like:
And with this information you can be sure that the kafka consumer is ready.