I have a state store which is defined like below:
StoreBuilder<KeyValueStore<String, DataDocument>> indexStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Data"), Serdes.String(), DataDocumentSerDes.dataDocumentSerDes())
.withLoggingEnabled(changelogConfig);
I have processor defined which will process source topic and write it to this state store.
I have a use-case to get list of keys based on a field in the DataDocument
which is the value in the state store. Is there a way I can achieve it?
One way I can think of is get all the keys and then perform filter. But it's going to be expensive as we iterate all keys always.
ReadOnlyKeyValueStore<String, DataDocument> roStore = streams.store(StoreQueryParameters.fromNameAndType("Data",
QueryableStoreTypes.<String, DataDocument>keyValueStore()));
KeyValueIterator<String, DataDocument> kvItr = roStore.all();
while(kvItr.hasNext()) {
if(kvItr.next().value.isField()) {
//Store to a list
}
}
Another approach is create state stores for all fields which has to be queryable with field as key and list of keys. But this is not scalable. Is there a better way I can achieve it with kafka topology?