The method kafka( ) consumeMessage above some time takes more time to finish processing the record underlying operations. is that the reason once time exceeds binders stopping consuming Data from the partitions , What are the config variable I can fine tune to resume the poll and binder continue process the message from the topic "test". Whenever I restart application then it processes.very much confused session.time.out , max.poll.interval.ms and so on so leave all those as default value.Restart the application it resumes consuming. I keep fine tuning max.poll.records does not help.
kafka-in-0:
consumer:
concurrency: 20
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
auto.offset.reset: latest
max.poll.records: 1
ack-mode: manual
enableDlq: true
dlqName: test.dlq
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
kafka-in-0:
destination: test
group: test.dev
binder: kafka
content-type: application/octet-stream`
The Topic "test" is compact and it has 20 partitions. that is why I mentioned concurrency 20.
cleanup.policy compact
min.cleanable.dirty.ratio 0.1
min.compaction.lag.ms 1000
retention.bytes -1
retention.ms -1
@Bean
public Consumer<Message<byte[]>> kafka(final StreamSourceConsumer kafkaSourceConsumer, final ObjectMapper objectMapper) {
return message -> {
try {
Map<String, Object> recordMessage = new HashMap<>();
recordMessage.put("id", message.getHeaders().get(KafkaHeaders.RECEIVED_KEY));
kafkaSourceConsumer.consumeMessage(
MessageBuilder.withPayload(objectMapper.writeValueAsString(recordMessage)).
copyHeaders(message.getHeaders()).build()
);
} catch (Exception e) {
throw new StitcherException("Unable to process", e);
}
};
}