Kafka exactly once messaging test with "consume-transform-produce" Integration test

185 Views Asked by At

I am writing testcase to test the my application's consume-transform-produce loop of the Kafka. So effectively I am consuming from a sourceTopic-processing-sendMessage to Destination topic. I am writing these testcases to prove the exactly once messaging with Kafka as I will add other failure cases later.
Here is my configuration:

private Map<String, Object> consConfigProps(boolean txnEnabled) {
    Map<String, Object> props = new HashMap<>(
            KafkaTestUtils.consumerProps(AB_CONSUMER_GROUP_ID, "false", kafkaBroker));
    props.put(ConsumerConfig.GROUP_ID_CONFIG, AB_CONSUMER_GROUP_ID);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return props;
}

private Map<String, Object> prodConfigProps(boolean txnEnabled) {
    Map<String, Object> props = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID().toString());
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
    props.put(ProducerConfig.RETRIES_CONFIG, "3");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                "prod-txn-" + UUID.randomUUID().toString());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}

public KafkaMessageListenerContainer<String, NormalUser> fetchContainer() {
    ContainerProperties containerProperties = new ContainerProperties(ABTOPIC, XYTOPIC, PATOPIC);
    containerProperties.setGroupId("groupId-10001");
    containerProperties.setAckMode(AckMode.MANUAL);
    containerProperties.setSyncCommits(true);
    containerProperties.setSyncCommitTimeout(Duration.ofMillis(5000));
    containerProperties.setTransactionManager(kafkaTransactionManager());
    KafkaMessageListenerContainer<String, NormalUser> kafkaMessageListContainer = new KafkaMessageListenerContainer<>(
            consumerFactory(), containerProperties);
    kafkaMessageListContainer.setupMessageListener(new AcknowledgingMessageListener<String, NormalUser>() {
        @Override
        public void onMessage(ConsumerRecord<String, NormalUser> record, Acknowledgment acknowledgment) {
            log.debug("test-listener received message='{}'", record.toString());
            records.add(record);
            acknowledgment.acknowledge();
        }
    });
    return kafkaMessageListContainer;
}

    @Test
    public void testProducerABSuccess() throws InterruptedException, IOException {
        NormalUser userObj = new NormalUser(ABTypeGood,
                Double.valueOf(Math.random() * 10000).longValue(),
                "Blah" + String.valueOf(Math.random() * 10));
        sendMessage(XYTOPIC, "AB-id", userObj);
        try {
            ConsumerRecords<String, NormalUser> records;
            parserConsumer.subscribe(Collections.singletonList(XYTOPIC));
            Map<TopicPartition, OffsetAndMetadata> currentOffsets = new LinkedHashMap<>();
            // Check for messages
            parserProducer.beginTransaction();
            records = parserConsumer.poll(Duration.ofSeconds(3));
            assertThat(1).isEqualTo(records.count()); // --> this asserts passes like 50% of the time. 
            for (ConsumerRecord<String, NormalUser> record : records) {
                assertEquals(record.key(), "AB-id");
                assertEquals(record.value(), userObj);
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset()));
            }
            parserProducer.send(new ProducerRecord<String, NormalUser>(ABTOPIC, "AB-id", userObj));
            parserProducer.sendOffsetsToTransaction(currentOffsets, AB_CONSUMER_GROUP_ID);
            parserProducer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            parserProducer.close();
        } catch (final KafkaException e) {
            parserProducer.abortTransaction();
        }
        ConsumerRecords<String, NormalUser> records;
        loadConsumer.subscribe(Collections.singletonList(ABTOPIC));
        records = loadConsumer.poll(Duration.ofSeconds(3));
        assertThat(1).isEqualTo(records.count()); //--> this assert fails all the time. 
        for (ConsumerRecord<String, NormalUser> record : records) {
            assertEquals(record.key(), "AB-id");
            assertEquals(record.value(), userObj);
        }
    }

My issue is that the above testcase "testProducerABSuccess" is not consistent and the asserts fails sometimes and sometimes they pass. I have not been able to figure out why they are so inconsistent. What is wrong with the above.

Edits: 16-12:

  1. Tested with consumerconfig.Auto_Offset_Reset_config-earliest no change. The first assert passes like 70% of the time. The second assert fails all the time (0% pass rate).
1

There are 1 best solutions below

3
On BEST ANSWER

Which assertion fails? If it's assertThat(1).isEqualTo(records.count());, it's probably because you are setting auto.offset.reset to latest. It needs to be earliest to avoid a race condition whereby the record is sent before the consumer is assigned the partitition(s).