How to get list of keys based on a field in value from Kafka state store

522 Views Asked by At

I have a state store which is defined like below:

StoreBuilder<KeyValueStore<String, DataDocument>> indexStore = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("Data"), Serdes.String(), DataDocumentSerDes.dataDocumentSerDes())
            .withLoggingEnabled(changelogConfig);  

I have processor defined which will process source topic and write it to this state store.

I have a use-case to get list of keys based on a field in the DataDocument which is the value in the state store. Is there a way I can achieve it?

One way I can think of is get all the keys and then perform filter. But it's going to be expensive as we iterate all keys always.

ReadOnlyKeyValueStore<String, DataDocument> roStore = streams.store(StoreQueryParameters.fromNameAndType("Data",
            QueryableStoreTypes.<String, DataDocument>keyValueStore()));
    KeyValueIterator<String, DataDocument> kvItr = roStore.all();
    while(kvItr.hasNext()) {
        if(kvItr.next().value.isField()) {
            //Store to a list
        }
    }

Another approach is create state stores for all fields which has to be queryable with field as key and list of keys. But this is not scalable. Is there a better way I can achieve it with kafka topology?

0

There are 0 best solutions below