Kafka KeyValueStore - delete method is not working

464 Views Asked by At

I am trying to schedule a task in one of my Kafka processors to delete records from a local KeyValueStore (RocksDB). Even though no exception has appeared so far, none of the records are getting deleted. Here is my code:

 processorContext.schedule(Duration.ofHours(6), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
  store.all().forEachRemaining(keyValue -> {
    // CommonUtil.removeOutdatedMessage(store,keyValue.key,keyValue.value.getSentAt());
    LocalDateTime sentAt = null;
    try {
      sentAt = LocalDateTime.parse(keyValue.value.getSentAt(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
    } catch (DateTimeException ex) {
      LOGGER.warn("Parsing of date {} failed for message with: {}", keyValue.value.getSentAt(), ex);
    }

    if (sentAt == null) {
      store.delete(keyValue.key);
    } else {
      boolean isExpired = sentAt.isBefore(LocalDateTime.now().minusDays(Constants.MESSAGE_EXPIRATION_LIMIT));
      if (isExpired) {
        store.delete(keyValue.key);
      }
    }
  });
}); 

}

1

There are 1 best solutions below

1
On

You haven't provided any details on whats the date value you are parsing. Also, did you see the log message of date parsing failed? A little more detail would surely help.

Based on the code snippet you shared, I assume the sentAt variable is being set as nonNull due to exception or whatever reason and isExpired as false.

Therefore you do not see any deletion happening. Put debug points to figure out the object flow in your code.

It's the problem in your java code not on Kafka state store.