How do I read Kafka messages in consumer from last committed offset in Apache Nifi?

2.5k Views Asked by At

I have started my producer to send data to Kafka and also started my consumer to pull the same data.When I was using Consumekafka processor (kafka version 1.0) in Apache Nifi, I have few queries in my mind which are related to Kafka consumer.

Q.1) When I start my ConsumeKafka processor at first time, how can I read messages from beginning and current messages?

Q.2) And also how to read messages after the last consumed message in case of consumer shutdown in Kafka?

How can we implement above two while using Apache Nifi?

1

There are 1 best solutions below

5
On BEST ANSWER

The ConsumeKafka processors have a property called "Offset Reset" which is used when there is no previous offset for the consumer group id, or when the offset no longer exists. The choices for this property are "Offset Latest" or "Offset Earliest", and defaults to latest.

So if you start a ConsumeKafka processor using a consumer group id that has never been used before, then it starts consuming from the latest messages. After that if you start and stop the processor it starts from the offset that it last consumed.

If you want to utilize the "Offset Reset" again to force it to earliest or latest then you need to change the consumer group id, because otherwise the existing consumer group will always use the existing offset to start from.

You can't simultaneously read messages from beginning and current, you can either start at the beginning and read all the way to current, or start at current. This is the way Kafka works and is not specific to NiFi.