Kafka stream messages being processed multiple times

993 Views Asked by At

I am currently in the process of developing a set of microservices that communicate through the use of Kafka and more specifically streams.

Under most cases and in development environments everything seems to be working fine with no issues however, in staging environments, I experience I behavior that I cannot understand why happens.

There are cases, where a single (or more) message(s) received through the stream consumer is processed by the application numerous times.

It seems that this occurs whenever the actual processing of the message (i.e application logic) takes some time complete -- which it may do considering that it involves I/O and other heavy operations.

I am fairly new to Kafka but as far as I understand this has to do with my consumer not committing the offset fast enough, which in turn does not mark it as being processed. I have attempted to find out a configuration setup that might aleviate this issue but to be frank, I did not understand what needs to be done.

For reference, this is the configuration of my Stream:

Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

The application is developed using Micronaut and Java, latest version and version 11 respectively.

If anyone can suggest what should be the approach in resolving this, it would be mighty helpful.

1

There are 1 best solutions below

0
On

afaik you should have a look at:

session.timeout.ms

The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

request.timeout.ms

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

see https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html

if you do not need the commit after processing, you could also think about async processing, but if the processing fails there is no automatic option to reprocess the message