Testing Kafka Producer and Consumer

22 Views Asked by At

I am new to Spring and Kafka and trying to learn the same by creating some small projects.

I have created a Producer-Consumer Application using Spring Cloud Stream and Apache Kafka, but I cannot figure out how I can test whether the data has been pushed to the topic or whether my Consumer is able to receive the message or not.

I have been searching for the same everywhere, but could not find any source that seemed like a solution to my problem.
Can someone please let me know how can I test my producer and consumer.

Here is my code:

import brave.ScopedSpan;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import brave.propagation.TraceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;


@SpringBootApplication
public class DemoApplication {
   Logger log= LoggerFactory.getLogger(DemoApplication.class);


   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }


   // Supplier
   @Bean
   public Supplier<String> supplierBinding1(Tracer tracer) {
      return () -> {
         try {
            Thread.sleep(1500);
         }
         catch (InterruptedException e){
            throw new RuntimeException(e);
         }


         String msg= "Hello";
         return msg;
      };
   }


   // Consumer
   @Bean public Consumer<String> consumerBinding1() {
      return msg -> {
                  log.info("Message received: "+ msg);
      };
   }
}

Here is the application.yml file:

spring:
 application:
   name: "producer_consumer"
 cloud:
   function:
     definition: consumerBinding1;supplierBinding1
   stream:
     bindings:
       supplierBinding1-out-0:
         destination: supplier-topic


       consumerBinding1-in-0:
         destination: supplier-topic

Here is my test file:

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;


import java.time.Duration;
import java.util.*;


import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;




@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
class AstMetricsApplicationTests {


   Logger log= LoggerFactory.getLogger(AstMetricsApplicationTests.class);


   @Container
   static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));


   @Autowired
   KafkaTemplate kafkaTemplate;


   // Initializing Bootstrap server
   @DynamicPropertySource
   public static void initKafkaProperties(DynamicPropertyRegistry registry){
      registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
   }


   @Autowired
   AppConfig app;


   // Container Tests
   @Test
   public void containerCreated(){
      await().pollInterval(Duration.ofSeconds(5)).untilAsserted( () ->{
         boolean check= kafka.isCreated();
         assertEquals(true, check);
      } );
   }


   @Test
   public void containerRunning(){
      await().pollInterval(Duration.ofSeconds(5)).untilAsserted( () ->{
         boolean check= kafka.isRunning();
         assertEquals(true, check);
      } );
   }


   // Sender Tests
   @Test
   public void checkSender(){
      kafkaTemplate.send("supplier-topic", "1234").whenComplete( (res, err) ->{
         boolean sent= false;
         if (Objects.nonNull(err)){
            log.info("Unable to send message");
         }
         else{
            sent= true;
            log.info("Message sent successfully");
         }
         assertEquals(true, sent);
      } );
   }


   // How do I test producer and consumer
}

Can someone please help me to test my producer and consumer.

Thank you in advance.

0

There are 0 best solutions below