Test onFailure of spring-kafka sending message

902 Views Asked by At

I try to test the onFailure case when I send a kafka message with producer but the onFailure method is never fire.

Here is my code where I send a message :

@Component
public class MessageSending {

    @Autowired
    Map<String, KafkaTemplate<String, String>> producerByCountry;

    String topicName = "countryTopic";

    public void sendMessage(String data) {
        producerByCountry.get("countryName").send(topicName, data).addCallback(
                onSuccess -> {},
                onFailure -> log.error("failed")
        );
    }
}

Here is the test class but it's still a success case and I have no idea how I can test the failure case (I want to add some processing inside the onFailure block but I would like to first know how I can trigger onFailure by testing).

@EmbeddedKafka
@SpringBootTest
public class MessageSendingTest {

    @MockBean
    Map<Country, KafkaTemplate<String, String>> producerByCountry;

    @Autowired
    EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    MessageSending messageSending;

    @Test
    void failTest(CapturedOutput capturedOutput) {
        var props = KafkaTestUtils.producerProps(embeddedKafka);
        var producerTemplate = new DefaultKafkaProducerFactory<String, String>(props);
        var template = new KafkaTemplate<>(producerTemplate);

        given(producerByCountry.get("USA"))).willReturn(template);

        messageSending.sendMessage("data");

        assertThat(capturedOutput).contains("failed");
        
    }
}

I also tried the idea of this topic How to test Kafka OnFailure callback with Junit? by doing

doAnswer(invocationOnMock -> {
    ListenableFutureCallback<SendResult<String, String>> listenableFutureCallback = invocationOnMock.getArgument(0);
    KafkaProducerException value = new KafkaProducerException(new ProducerRecord<String, String>("myTopic", "myMessage"), "error", ex);
    listenableFutureCallback.onFailure(value);
    return null;
}).when(mock(ListenableFuture.class)).addCallback(any(ListenableFutureCallback.class));

But I got this mockito exception org.mockito.exceptions.misusing.UnnecessaryStubbingException due by when().addCallback

Can someone can help ?

Thanks.

1

There are 1 best solutions below

6
On

You can use a mock template; see this answer for an example:

How to mock result from KafkaTemplate

EDIT

You can also mock the underlying Producer object - here is an example that is closer to your use case...

@SpringBootApplication
public class So75074961Application {

    public static void main(String[] args) {
        SpringApplication.run(So75074961Application.class, args);
    }

    @Bean
    KafkaTemplate<String, String> france(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf, Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "france:9092"));
    }

    @Bean
    KafkaTemplate<String, String> germany(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf, Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "germany:9092"));
    }

}

@Component
class MessageSending {

    private static final Logger log = LoggerFactory.getLogger(MessageSending.class);

    @Autowired
    Map<String, KafkaTemplate<String, String>> producerByCountry;

    String topicName = "countryTopic";

    public void sendMessage(String country, String data) {
        producerByCountry.get(country).send(topicName, data).addCallback(
                onSuccess -> log.info(onSuccess.getRecordMetadata().toString()),
                onFailure -> log.error("failed: " + onFailure.getMessage()));
    }

}
@SpringBootTest
@ExtendWith(OutputCaptureExtension.class)
class So75074961ApplicationTests {

    @Test
    void test(@Autowired MessageSending sending, CapturedOutput capture) {
        ProducerFactory<String, String> pf = mock(ProducerFactory.class);
        Producer<String, String> prod = mock(Producer.class);
        given(pf.createProducer()).willReturn(prod);
        willAnswer(inv -> {
            Callback callback = inv.getArgument(1);
            callback.onCompletion(null, new RuntimeException("test"));
            return mock(Future.class);
        }).given(prod).send(any(), any());

        // inject the mock pf into "france" template
        Map<?, ?> producers = KafkaTestUtils.getPropertyValue(sending, "producerByCountry", Map.class);
        new DirectFieldAccessor(producers.get("france")).setPropertyValue("producerFactory", pf);

        sending.sendMessage("france", "foo");
        assertThat(capture)
                .contains("failed: Failed to send; nested exception is java.lang.RuntimeException: test");
    }

}

Use CompletableFuture instead of ListenableFuture for versions 3.0 or later.

    public void sendMessage(String country, String data) {
        producerByCountry.get(country).send(topicName, data).whenComplete(
                (res, ex) -> {
                    if (ex == null) {
                        log.info(res.getRecordMetadata().toString());
                    }
                    else {
                        log.error("failed: " + ex.getMessage());
                    }
                });
    }

and

        assertThat(capture)
                .contains("failed: Failed to send");

(the latter because Spring Framework 6.0+ no longer merges nested exception messages; the top level exception is a KafkaProducerException, with the actual exception as its cause).