I have a kafkaStreams
application using an MSK cluster on AWS.
I need to clean the state store (created after using some KTables
in my application).
I can't find any way to access the file system for the MSK cluster.
I found here that I can use :
KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();
app.start();
But in my case I am using Spring Kafka and I dont have that KafkaStreams
instance in my code, and the app is starting automatically.
Also I found that just by deleting the topic (the input of my state store) the state store will be deleted, not sure how much time is needed here, I tried to delete the topic and after 15 minutes the state store looks still there so I just recreated the topic.
Also I found this suggestion about getting the state store directory path and delete it using the application code, and I am sure it will not work because the directory is used by the app itself at the same time so it can't be deleted, also not sure that the application can delete anything in the cluster:
String stateDirectory = config.getString(StreamsConfig.STATE_DIR_CONFIG);
// Delete the state directory using appropriate file operations
The single way that I think it can be the solution is : create a Punctuator or a Processor or something like that, get the state store name, pass it to the processor and clean the state store there, is that seems to be the good solution here?
Thank you in advance.
Can you SSH to the broker EC2 instances? That's the only way that's possible.
Correct. The state store is stored where your apps run. The Kafka cluster only stores the compacted, internal topics of the KTable, not any RocksDB instance metadata.
You'd use
kafka-streams-application-reset.sh
to delete the data on the cluster.You do/should. It is only abstracted away. https://docs.spring.io/spring-kafka/docs/current/reference/html/#streams-kafka-streams
If you only have
@KafkaListener
consumer, then that's not using Kafka Streams.