I am writing testcase to test the my application's consume-transform-produce loop of the Kafka. So effectively I am consuming from a sourceTopic-processing-sendMessage to Destination topic. I am writing these testcases to prove the exactly once messaging with Kafka as I will add other failure cases later.
Here is my configuration:
private Map<String, Object> consConfigProps(boolean txnEnabled) {
Map<String, Object> props = new HashMap<>(
KafkaTestUtils.consumerProps(AB_CONSUMER_GROUP_ID, "false", kafkaBroker));
props.put(ConsumerConfig.GROUP_ID_CONFIG, AB_CONSUMER_GROUP_ID);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
private Map<String, Object> prodConfigProps(boolean txnEnabled) {
Map<String, Object> props = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID().toString());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"prod-txn-" + UUID.randomUUID().toString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
public KafkaMessageListenerContainer<String, NormalUser> fetchContainer() {
ContainerProperties containerProperties = new ContainerProperties(ABTOPIC, XYTOPIC, PATOPIC);
containerProperties.setGroupId("groupId-10001");
containerProperties.setAckMode(AckMode.MANUAL);
containerProperties.setSyncCommits(true);
containerProperties.setSyncCommitTimeout(Duration.ofMillis(5000));
containerProperties.setTransactionManager(kafkaTransactionManager());
KafkaMessageListenerContainer<String, NormalUser> kafkaMessageListContainer = new KafkaMessageListenerContainer<>(
consumerFactory(), containerProperties);
kafkaMessageListContainer.setupMessageListener(new AcknowledgingMessageListener<String, NormalUser>() {
@Override
public void onMessage(ConsumerRecord<String, NormalUser> record, Acknowledgment acknowledgment) {
log.debug("test-listener received message='{}'", record.toString());
records.add(record);
acknowledgment.acknowledge();
}
});
return kafkaMessageListContainer;
}
@Test
public void testProducerABSuccess() throws InterruptedException, IOException {
NormalUser userObj = new NormalUser(ABTypeGood,
Double.valueOf(Math.random() * 10000).longValue(),
"Blah" + String.valueOf(Math.random() * 10));
sendMessage(XYTOPIC, "AB-id", userObj);
try {
ConsumerRecords<String, NormalUser> records;
parserConsumer.subscribe(Collections.singletonList(XYTOPIC));
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new LinkedHashMap<>();
// Check for messages
parserProducer.beginTransaction();
records = parserConsumer.poll(Duration.ofSeconds(3));
assertThat(1).isEqualTo(records.count()); // --> this asserts passes like 50% of the time.
for (ConsumerRecord<String, NormalUser> record : records) {
assertEquals(record.key(), "AB-id");
assertEquals(record.value(), userObj);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()));
}
parserProducer.send(new ProducerRecord<String, NormalUser>(ABTOPIC, "AB-id", userObj));
parserProducer.sendOffsetsToTransaction(currentOffsets, AB_CONSUMER_GROUP_ID);
parserProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
parserProducer.close();
} catch (final KafkaException e) {
parserProducer.abortTransaction();
}
ConsumerRecords<String, NormalUser> records;
loadConsumer.subscribe(Collections.singletonList(ABTOPIC));
records = loadConsumer.poll(Duration.ofSeconds(3));
assertThat(1).isEqualTo(records.count()); //--> this assert fails all the time.
for (ConsumerRecord<String, NormalUser> record : records) {
assertEquals(record.key(), "AB-id");
assertEquals(record.value(), userObj);
}
}
My issue is that the above testcase "testProducerABSuccess" is not consistent and the asserts fails sometimes and sometimes they pass. I have not been able to figure out why they are so inconsistent. What is wrong with the above.
Edits: 16-12:
- Tested with consumerconfig.Auto_Offset_Reset_config-earliest no change. The first assert passes like 70% of the time. The second assert fails all the time (0% pass rate).
Which assertion fails? If it's
assertThat(1).isEqualTo(records.count());
, it's probably because you are settingauto.offset.reset
tolatest
. It needs to beearliest
to avoid a race condition whereby the record is sent before the consumer is assigned the partitition(s).