How to remove the kafka state store in AWS MSK cluster

252 Views Asked by At

I have a kafkaStreams application using an MSK cluster on AWS. I need to clean the state store (created after using some KTables in my application). I can't find any way to access the file system for the MSK cluster.

I found here that I can use :

KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions.  See tip on `cleanUp()` below.
app.cleanUp();
app.start();

But in my case I am using Spring Kafka and I dont have that KafkaStreams instance in my code, and the app is starting automatically.

Also I found that just by deleting the topic (the input of my state store) the state store will be deleted, not sure how much time is needed here, I tried to delete the topic and after 15 minutes the state store looks still there so I just recreated the topic.

Also I found this suggestion about getting the state store directory path and delete it using the application code, and I am sure it will not work because the directory is used by the app itself at the same time so it can't be deleted, also not sure that the application can delete anything in the cluster:

String stateDirectory = config.getString(StreamsConfig.STATE_DIR_CONFIG);
// Delete the state directory using appropriate file operations

The single way that I think it can be the solution is : create a Punctuator or a Processor or something like that, get the state store name, pass it to the processor and clean the state store there, is that seems to be the good solution here?

Thank you in advance.

2

There are 2 best solutions below

4
On

any way to access the file system for the MSK cluster

Can you SSH to the broker EC2 instances? That's the only way that's possible.

not sure that the application can delete anything in the cluster

Correct. The state store is stored where your apps run. The Kafka cluster only stores the compacted, internal topics of the KTable, not any RocksDB instance metadata.

You'd use kafka-streams-application-reset.sh to delete the data on the cluster.

I am using Spring Kafka and I dont have that KafkaStreams instance in my code

You do/should. It is only abstracted away. https://docs.spring.io/spring-kafka/docs/current/reference/html/#streams-kafka-streams

If you only have @KafkaListener consumer, then that's not using Kafka Streams.

0
On

I learned some useful information related with my previous question so I want to share it here:

By default the state store will be created in the /tmp/kafka-streams directory ( windows : C:\tmp\kafka-streams, mac : /var/folders/<random_characters>/<random_characters>/T/kafka-streams ) and if you are using k8s, it will be in the same path in your pod, but you can change the StreamsConfig.STATE_DIR_CONFIG for your KafkaStreams application to change it's default path.

To delete the state store you can just delete the directory, but take in your consideration that the state store will be re-created automatically again when the application restarts, it is because the store is synchronized with the change-log topics stored (created) in the kafka cluster itself.

As @OneCricketeer mentioned, the state store is the rocksDB data, and it will be created where the application is running, so the stateStore is not created in the MSK cluster.

The single thing that will be created in the MSK cluster is the changelog topics, those topics are as their name mentions a log for all the changes happened on the KTable.

So if the stateStore was been deleted(manually or due to a pod restart on k8s for example) it can be easily recreated again exactly when the application starts again by using the changelog topics(RocksDB reads the data from the changelog topic and rebuilds the local state store).

kafka streams and state store

Note : You can avoid that approach of recreating the state store each time you restart the application(time consumption) by using a dedicated volume where the state store(the rocksDB) will be created. On k8s you can use the Persistence Volume Claims (PVCs) and Persisted Volumes (PVs) in order to don't delete the state store when restarting the application.

k8s volumes can be used to store the stateStore