I have a app that uses KStream to read from Kafka, filter the data based on the header, and write to KTable.
public Topology buildTopology() {
KStream<String,String> inputStream = builder.stream("topicname");
KStream<String,String> filteredStream = inputStream.transformValues(KSExtension::new)
.filter((key,value) -> value!=null);
kTable = filteredStream.groupByKey()
.reduce(((value1, value2) -> value2), Materialized.as("ktable"));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
return builder.build();
}
I'm trying to create a unit test for this using TopologyTestDriver
private TopologyTestDriver td;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
private Topology topology;
private Properties streamConfig;
@BeforeEach
void setUp() {
streamConfig = new Properties();
streamConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "AppId");
streamConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
streamConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
topology = new Topology();
td = new TopologyTestDriver(topology, streamConfig);
inputTopic = td.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = td.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.String().deserializer());
}
@Test
void buildTopology(){
inputTopic.pipeInput("key1", "value1");
topology = app.buildTopology();
}
When I run the test, i get the exception "java.lang.IllegalArgumentException: Unknown topic: input-topic"
DEBUG org.apache.kafka.streams.processor.internals.InternalTopologyBuilder - No source topics using pattern subscription found, initializing consumer's subscription collection.
java.lang.IllegalArgumentException: Unknown topic: input-topic
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:582)
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:945)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:137)
at testclassname.buildTopology()
Can some one help me understand what I am missing here?
I see you are creating an empty
Topology, used to initialize theTopologyTestDriver:When this empty topology is used to instantiate the
TopologyTestDriverwithtd = new TopologyTestDriver(topology, streamConfig);, the test driver does not know about any topics, since no topology was effectively built.I suppose this is why, when you try to pipe input into
"input-topic"usinginputTopic.pipeInput("key1", "value1");, the test driver throws anIllegalArgumentExceptioncomplaining about an "Unknown topic: input-topic".You should call your
buildTopology()method to generate the actual topology you are testing, and use that when creating yourTopologyTestDriver.Make sure that the topic names in your test (
input-topic,output-topic) match those in your actual application ("topicname").Note: I removed the output topic from the setup because in your code snippet you did not specify the output topic where your
KTablegets written to. You may add it back if your actual application writes to an output topic.You can query the state store that backs the
KTableto check its contents.In Kafka Streams, each KTable is backed by a state store (even a versioned one very recently, Aug. 2023), and you can directly interact with this store in tests.
Make sure that you set a store name for your KTable in your topology:
Here,
"myKTableStore"is the name of the state store that backs the KTable.In your test, you can retrieve the store from the
TopologyTestDriverand check the value for a specific key:That way, you can validate that your
KTablecontains the expected key-value pairs.Note that
ReadOnlyKeyValueStoreis a part of the Kafka Streams API. Import it as needed.You can see it used in "Kafka Streams Interactive Queries / Querying local key-value stores"
In Kafka Streams'
TopologyTestDriver, the ability to directly add headers to theTestInputTopicis somewhat limited.However, you can use the lower-level
pipeInput()method that allows you to pass aConsumerRecordobject, which can have headers.You will need to build the
ConsumerRecordmanually and then use it:Make sure to replace
"topicname"with the name of the topic you are actually reading from in your topology, and adjust the key, value, and headers as needed for your test.That should allow you to include headers in your test records, which should then be processed by your
transformValuesoperation as expected.