Getting high number of duplicate records with Apache Kafka Consumer

1k Views Asked by At
  1. I have setup a Apache kafka cluster with three hosts.
  2. I have created a topic with replication factor 3 and partition count 8.
  3. I have divided my data equally among the 8 partitions.
  4. I have 8 single threaded consumers and i am leaving kafka to do partition assignment to each consumer.
  5. I am using enable auto commit as false and AckMode as RECORD.

PROBLEM:

When I stop and start the consumer. Duplicate message count is in hundreds. I am expecting a count not more than 8 since i have 8 consumers and AckMode is RECORD.

Please help!!!

This is my consumer config.

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.support.serializer.JsonDeserializer;


@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.market.persist.consumergroup}")
    private String marketDataPersistConsumer;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Quote> marketDataKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Quote> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(marketDataPersistConsumer));
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Quote> consumerFactory(String groupId) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId), new StringDeserializer(), new JsonDeserializer<>(Quote.class));
    }

    @Bean
    public Map<String, Object> consumerConfigs(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }
}

and this is my Message Listner:

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;


@Component
public class MessageListener {


    private static final Logger LOG = Logger.getLogger("debugLogger");

    private void persistQuote(Quote quote, int partition) {
        save(quote);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerZero(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerOne(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerTwo(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerThree(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerFour(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerFive(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerSix(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerSeven(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }
}

Kakfa Debug Logs

11:32:26 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit > list: {} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - > > Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15649, metadata=''}, market-data-topic-dev3-3=OffsetAndMetadata{offset=15408, metadata=''}} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15648, metadata=''}, market-data-topic-dev3-1=OffsetAndMetadata{offset=15269, metadata=''}} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15631, metadata=''}, market-data-topic-dev3-5=OffsetAndMetadata{offset=15515, metadata=''}} 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15657, metadata=''}, market-data-topic-dev3-7=OffsetAndMetadata{offset=15631, metadata=''}} 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-0, market-data-topic-dev3-1] 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-2, market-data-topic-dev3-3] 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-4, market-data-topic-dev3-5] 11:32:27 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-6, market-data-topic-dev3-7] 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:27 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:29 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:15534 - Partition:6, lastPrice=6, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=15534, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15657, kafka_receivedMessageKey=null, kafka_receivedPartitionId=6, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15775, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15775, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15755, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15755, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15783, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15783, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15776, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15776, metadata=''}} Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-2, market-data-topic-dev3-3] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-4, market-data-topic-dev3-5] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker 2017-08-30 11:32:39 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16440 - Partition:0, lastPrice=0, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16440, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15777, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=market-data-topic-dev3}]] Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-6, market-data-topic-dev3-7] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 2017-08-30 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-6, market-data-topic-dev3-7] 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15784, metadata=''}} Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare INFO: Revoking previously assigned partitions [market-data-topic-dev3-0, market-data-topic-dev3-1] for group market-data-persist-cg-dev3 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15784, metadata=''}} 2017-08-30 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1063 - Stopping invoker Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendJoinGroupRequest INFO: (Re-)joining group market-data-persist-cg-dev3 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-2, market-data-topic-dev3-3] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15777, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15777, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-4, market-data-topic-dev3-5] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15757, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15757, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:1082 - Invoker stopped 11:32:39 INFO KafkaMessageListenerContainer:242 - partitions revoked:[market-data-topic-dev3-0, market-data-topic-dev3-1] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15778, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15778, metadata=''}} Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete INFO: Setting newly assigned partitions [market-data-topic-dev3-2] for group market-data-persist-cg-dev3 Aug 30, 2017 11:32:39 AM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete INFO: Setting newly assigned partitions [market-data-topic-dev3-6] for group market-data-persist-cg-dev3 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15269, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15778, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15408, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-7=OffsetAndMetadata{offset=15631, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15777, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15757, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15784, metadata=''}} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:450 - Committing: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15515, metadata=''}} 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-1] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-0] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-3] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-7] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-4] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-6] 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-2] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 INFO KafkaMessageListenerContainer:247 - partitions assigned:[market-data-topic-dev3-5] 11:32:39 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16448 - Partition:0, lastPrice=0, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16448, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15778, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 500 records 11:32:39 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16324 - Partition:4, lastPrice=4, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16324, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15757, kafka_receivedMessageKey=null, kafka_receivedPartitionId=4, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:39 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15909, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15909, metadata=''}} 11:32:49 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17474 - Partition:2, lastPrice=2, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17474, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15907, kafka_receivedMessageKey=null, kafka_receivedPartitionId=2, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15883, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15883, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15907, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15907, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15914, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15914, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15645, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15645, metadata=''}} 11:32:49 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17598 - Partition:6, lastPrice=6, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17598, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15915, kafka_receivedMessageKey=null, kafka_receivedPartitionId=6, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15395, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15395, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15536, metadata=''}} 11:32:49 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15536, metadata=''}} 11:32:49 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17482 - Partition:2, lastPrice=2, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17482, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMess 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-7=OffsetAndMetadata{offset=15774, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-7=OffsetAndMetadata{offset=15774, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:13489 - Partition:1, lastPrice=1, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=13489, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15408, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15922, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-0=OffsetAndMetadata{offset=15922, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15896, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-4=OffsetAndMetadata{offset=15896, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15920, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-2=OffsetAndMetadata{offset=15920, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15928, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-6=OffsetAndMetadata{offset=15928, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:17608 - Partition:0, lastPrice=0, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=17608, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15923, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15659, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:14659 - Partition:3, lastPrice=3, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=14659, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15550, kafka_receivedMessageKey=null, kafka_receivedPartitionId=3, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-5=OffsetAndMetadata{offset=15659, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:15557 - Partition:5, lastPrice=5, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=15557, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15660, kafka_receivedMessageKey=null, kafka_receivedPartitionId=5, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:560 - Received: 0 records 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15550, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-3=OffsetAndMetadata{offset=15550, metadata=''}} 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:961 - Commit list: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15408, metadata=''}} 11:32:50 DEBUG RecordMessagingMessageListenerAdapter:70 - Processing [GenericMessage [payload=Quote [ricCode=Sequence:16559 - Partition:7, lastPrice=7, systemTime=null, tickTime=null, localTickTime=null, localTickTimeEpoch=null, closePrice=16559, closeTime=null, localCloseTime=null, localCloseTimeEpoch=null, messageState=null, logMessage=null, tradeVolume=null], headers={kafka_offset=15781, kafka_receivedMessageKey=null, kafka_receivedPartitionId=7, kafka_receivedTopic=market-data-topic-dev3}]] 11:32:50 DEBUG KafkaMessageListenerContainer$ListenerConsumer:965 - Committing: {market-data-topic-dev3-1=OffsetAndMetadata{offset=15408, metadata=''}}

0

There are 0 best solutions below