Unable to Send Kafka Messages with EmbeddedKafka in Spring Boot3

275 Views Asked by At

I am facing an issue with my Spring Boot application (version 3.2.1) and Embedded Kafka. The problem is that the KafkaTemplate is not successfully sending messages

Here is my test code


import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

@ExtendWith(MockitoExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }, controlledShutdown = true)
public class PaymentITest {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${demo.kafka.topic.payment}")
    private String topicName;

    @Test
    void test() {
        String jsonPayload = "{ \"payload\": { \"payment_id\": \"1\" } }";

        kafkaTemplate.send(topicName, "key", jsonPayload);
    }
}

**The Kafka Consumer **

import fr.test.test.compute.common.event.PaymentEvent;
import fr.test.test.compute.core.domain.port.api.PaymentRequester;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPaymentConsumer {

    private final PaymentRequester paymentRequester;

    @KafkaListener(topics = "#{'${demo.kafka.topic.payment}'}",
                   groupId = "#{'${demo.kafka.group-id}'}")
    public void consumePaymentEvents(PaymentEvent paymentEvent)  {
        paymentRequester.handlePaymentReceivedEvent(paymentEvent.extractModel());
    }

}

test/resources/application.yml

spring:
  mongodb:
    embedded:
      storage:
        oplogSize: 10
        repl-set-name: rs0
      version: "5.0.5"
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      group-id: kafka-group-id
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: fr.test.test.compute.config.kafka.KafkaEventDeserializer
    producer:
      bootstrap.servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: fr.test.test.compute.config.kafka.KafkaEventSerializer

de:
  flapdoodle:
    mongodb:
      embedded:
        version: 4.0.2

demo:
  kafka:
    topic:
      payment: payment_topic2
    group-id: kafka-group

I already tested with a local kafka broker, the consumer works well !

but when running the test, I added a breakpoint in the consumer, and this last is not being hit ! what could be causing the KafkaTemplate not to send messages (as shown in the picture below) ? Is it an issue related to Kafka Embedded in SpringBoot 3

Here is the Github repository for the code base https://github.com/smaillns/springboot-mongo-kafka

Any suggestions would be appreciated ?

enter image description here

java.util.concurrent.CompletableFuture@bfc918c[Not completed]
1

There are 1 best solutions below

0
Przemysław On

Have you tried to comment out bootstrap-servers: localhost:9092 in test resources application.yml?. I have had similar problem and this helped me, otherwise my app was trying to call my Kafka in Docker.