What is the right configuration for Kafka with long processing messages and horizontally scaling

316 Views Asked by At

My application consumes messages that their processing duration average is between 5-15 min for 1 message with max.poll.records: 1.

When no messages there is 1 pod (K8S) and for each incoming message I'm scaling the pod adding 1 pod up to a maximum of 50. (I have 50 partitions)

Now I have a couple of issues:

  1. When a new pod raises it takes a lot of time until I see the partition assigned. I can see pods that can start and terminate after a couple of minutes before they even took 1 message.
  2. I can see when a lot of messages inserted to the topic (more than 10 (For this application it is a lot)) it start to get commitException and therefore to consume same message twice on different pods.

Error:

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1361) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1063) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_252]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_252]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_252]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1109) ~[kafka-clients-2.5.0.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:976) ~[kafka-clients-2.5.0.jar!/:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1511) ~[kafka-clients-2.5.0.jar!/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2311) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2306) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2292) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2106) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1097) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    ... 3 common frames omitted

My Kafka config is:

spring.kafka:
  bootstrap-servers: {{ .Values.kafka.service }}:9092
  consumer:
    client-id: XXX
    group-id: YYY
    auto-offset-reset: earliest
    enable-auto-commit: false
    fetch-min-size: 1
    fetch-max-wait: 1
    properties:
      heartbeat.interval.ms: 3000
      max.poll.interval.ms: 1200000
      max.poll.records: 1

  producer:
    client-id: XXX
0

There are 0 best solutions below