Spring cloud stream kafka binder not consuming messages after some time

31 Views Asked by At

The method kafka( ) consumeMessage above some time takes more time to finish processing the record underlying operations. is that the reason once time exceeds binders stopping consuming Data from the partitions , What are the config variable I can fine tune to resume the poll and binder continue process the message from the topic "test". Whenever I restart application then it processes.very much confused session.time.out , max.poll.interval.ms and so on so leave all those as default value.Restart the application it resumes consuming. I keep fine tuning max.poll.records does not help.

kafka-in-0:             
  consumer:               
    concurrency: 20               
    configuration:                 
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer                 
      value.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer                 
      auto.offset.reset: latest                 
      max.poll.records: 1               
      ack-mode: manual               
      enableDlq: true               
      dlqName: test.dlq               
      dlqProducerProperties:                 
      configuration:                   
        key.serializer: org.apache.kafka.common.serialization.StringSerializer                   
        value.serializer: org.apache.kafka.common.serialization.StringSerializer
kafka-in-0:           
    destination: test           
    group: test.dev           
    binder: kafka           
    content-type: application/octet-stream`

The Topic "test" is compact and it has 20 partitions. that is why I mentioned concurrency 20.
cleanup.policy  compact
min.cleanable.dirty.ratio 0.1
min.compaction.lag.ms 1000
retention.bytes -1
retention.ms  -1
   @Bean
    public Consumer<Message<byte[]>> kafka(final StreamSourceConsumer kafkaSourceConsumer, final ObjectMapper objectMapper) {
        return message -> {
            try {
                Map<String, Object> recordMessage = new HashMap<>();
                recordMessage.put("id", message.getHeaders().get(KafkaHeaders.RECEIVED_KEY));
                kafkaSourceConsumer.consumeMessage(
                        MessageBuilder.withPayload(objectMapper.writeValueAsString(recordMessage)).
                                copyHeaders(message.getHeaders()).build()
                );
               
            } catch (Exception e) {
                throw new StitcherException("Unable to process", e);
            }
        };
    }
0

There are 0 best solutions below