I have a simple spring-boot KStream topology that transforms a string from lowercase to uppercase. I want my integration test to launch an embedded kafka, and then test the topology. I would like to know if it possible to write integration tests like these using spring @EmbeddedKafka
?
I have seen several examples using @EmbeddedKafka
with simple consumers using @KafkaListener
but not any that uses KStream.
I tried attempting to test the following topology to transform from incoming text stream from lowercase to uppercase.
Here's the topology:
@Configuration
public class UppercaseStream {
private static final String LOWERCASE_TOPIC = "t.lower.case";
private static final String UPPERCASE_TOPIC = "t.upper.case";
@Bean
@Qualifier("kStreamPromoToUppercase")
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder
.stream(LOWERCASE_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream..."));
KStream<String, String> upperCaseStream = sourceStream.mapValues(text -> text.toUpperCase());
upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream..."));
upperCaseStream.to(UPPERCASE_TOPIC);
return upperCaseStream;
}
}
The unit test that tests the topology is:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class UpperCaseTopologyTest {
TopologyTestDriver testDriver;
@AfterAll
void tearDown(){
testDriver.close();
}
@Test
@DisplayName("should transform lowercase to uppercase words")
void shouldTransformLowercaseWords() {
//Given
StreamsBuilder builder = new StreamsBuilder();
new UppercaseStream().kStreamPromoToUppercase(builder);
Topology topology = builder.build();
// setup test driver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//Create a Topology Test Driver
testDriver = new TopologyTestDriver(topology, props);
TestInputTopic<String, String> inputTopic = testDriver.createInputTopic("t.lower.case", new Serdes.StringSerde().serializer(), new Serdes.StringSerde().serializer());
TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic("t.upper.case", new Serdes.StringSerde().deserializer(), new Serdes.StringSerde().deserializer());
//When
inputTopic.pipeInput("test");
//Then
assertThat(outputTopic.readValue()).isEqualTo("TEST");
}
}
I want to write an integration test that first launches an embedded kafka server and then test the UppercaseStream topology.
I tried the following:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {
@Autowired
public KafkaTemplate<String, String> template;
@Autowired
private KafkaConsumer consumer;
private KafkaStreams kafkaStreams;
@Value("${test.topic}")
private String topic;
@Autowired
private KafkaStreamsConfiguration kafkaStreamsConfiguration;
@Test
public void should_transform_lowercase_to_uppercase() throws Exception {
//Create a StreamsBuilder
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(topic, Consumed.with(new Serdes.StringSerde(), new Serdes.StringSerde()));
//Add a topology
new UppercaseStream().kStreamPromoToUppercase(streamsBuilder);
kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration.asProperties());
kafkaStreams.start();
template.send(topic, "test");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("TEST"));
}
@After
public void tearDown() {
if (kafkaStreams!= null) kafkaStreams.close();
}
}
The test fails the assertion. I am not sure how to get kStreamPromoToUppercase bean. I am not sure if I am trying following the correct approach.
There were a few things missing from the integration test.
A couple
NewTopic
kafka client admin objects were needed to represent an input and an output topics@Bean public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); }
The other one is for the output topic@Bean public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); }
The rest of the test remains more of less the same. As suggested by @Garry I used the kafka consumer.
Here's the gist of the full refactored solution.