Committing messages are not getting recorded as Outgoing Message

27 Views Asked by At

We have a requirements where we want to wait for one message to reach consumer, once we received the particular message we should commit all the offsets we have accumulated

Once we start receiving messages from EventHub Kafka we should keep track of partitioned offsets and once we receive the final message(I am expecting that message after 90 seconds) we should create a batch of messages and commit all the the respective offsets at once.

I am seeing below behaviour :

I am able to retrieve all the messages from the EventHub using Kafka protocol, its just that the Outgoing messages in EventHub are less than the Incoming messages

I created a sample program using below consumer properties

// deserialiser
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// EventHub specific
final String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"" + connectionString + "\";";
final String bootstrapServers = String.format("%s.%s:9093", getNameSpace(connectionString), "servicebus.windows.net");
kafkaProps.put("security.protocol", "SASL_SSL");
kafkaProps.put("sasl.mechanism", "PLAIN");enter image description here
kafkaProps.put("sasl.jaas.config", saslJaasConfig);
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

kafkaProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 500000000);
kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100000); // Default is 500
kafkaProps.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 180000); // range < 240,000
kafkaProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 600000); // range < 240,000
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, anyUniqueID);

kafkaProps.put("offset.flush.timeout.ms", 100000);
kafkaProps.put("offsets.commit.timeout.ms", 100000);
kafkaProps.put("commit.interval.ms", 100000);
kafkaProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
kafkaProps.put("max.poll.interval.ms", 100000);
kafkaProps.put("default.api.timeout.ms", 100000);

I am using below snippet for my testing purpose

Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
List<Map<TopicPartition, OffsetAndMetadata>> listOfMaps = new ArrayList<>();
while (!stopThread) {
    final ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(KAFKA_POLL_DURATION); // KAFKA_POLL_DURATION is 3 seconds
    if (consumerRecords.count() > 0) {
        for (ConsumerRecord<String, String> record : consumerRecords) {
            String valAsString = record.value();
            listOfMaps.add(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
            offsetAndMetadataMap.put(new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1));
        }
    }

    if (System.currentTimeMillis() > collectTimeout) { // collectTimeout is 90Seconds

            for(Map<TopicPartition, OffsetAndMetadata> map : listOfMaps){
                kafkaConsumer.commitSync(map);
            }
            kafkaConsumer.commitSync(offsetAndMetadataMap); // I am committing in this way
            kafkaConsumer.commitSync(); // I tried this with other combination, but not getting expected result
            offsetAndMetadataMap.clear();
            listOfMaps.clear();
            // this is to stop the thread after the threshold
            if (System.currentTimeMillis() > thresholdMillis) { // thresholdMillis is 10 minutes
                kafkaConsumer.close();
                stopThread = true;
            }
    }
}

I am seeing below graph when I do it

enter image description here the outgoing messages are less than the incoming messages

when I tried collecting all offsets like below

listOfMaps.add(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));

and committing all messages at once

for(Map<TopicPartition, OffsetAndMetadata> map : listOfMaps){
    kafkaConsumer.commitSync(map);
}

I am seeing below graph

enter image description here

and when collect the offsets in below way

offsetAndMetadataMap.put(new TopicPartition(record.topic(), record.partition()),                                 new OffsetAndMetadata(record.offset() + 1));

and then committing all messages at once

kafkaConsumer.commitSync(offsetAndMetadataMap);

I am seeing outgoing messages less than the incoming messages

OutgoingMessage is less than incoming message

0

There are 0 best solutions below