How to remove old records from a state store using a punctuator? (Kafka)

2.3k Views Asked by At

I've created a Ktable for a topic using streamsBuilder.table("myTopic"), which I materialize to a state store so that I can use interactive queries.

Every hour, I want to remove records from this state store (and associated changelog topic) whose values haven't been updated in the past hour.

I believe this may be possible using a punctuator, but I've only used the DSL so far, and so am not sure of how to proceed. I would be very grateful if somebody could provide me with an example.

Thanks,

Jack

1

There are 1 best solutions below

3
On BEST ANSWER

It is possible to mix and match the Processor API with the DSL, but you can't process a KTable. You would need to convert to a KStream. Alternatively you could create a new topology with a Processor that interacts with the state store.

You will need to store that state somewhere - how to determine if records are older than one hour. One option could be to add a timestamp to each record in the state store.

In the init method of a Processor you could call schedule (punctuate) to iterate records in the state store and remove old ones:

context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
    myStateStore.all().forEachRemaining(keyValue -> {
        if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
            myStateStore.delete(keyValue.key);
        }
    });
});