I am trying to schedule a task in one of my Kafka processors to delete records from a local KeyValueStore (RocksDB). Even though no exception has appeared so far, none of the records are getting deleted. Here is my code:
processorContext.schedule(Duration.ofHours(6), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
store.all().forEachRemaining(keyValue -> {
// CommonUtil.removeOutdatedMessage(store,keyValue.key,keyValue.value.getSentAt());
LocalDateTime sentAt = null;
try {
sentAt = LocalDateTime.parse(keyValue.value.getSentAt(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
} catch (DateTimeException ex) {
LOGGER.warn("Parsing of date {} failed for message with: {}", keyValue.value.getSentAt(), ex);
}
if (sentAt == null) {
store.delete(keyValue.key);
} else {
boolean isExpired = sentAt.isBefore(LocalDateTime.now().minusDays(Constants.MESSAGE_EXPIRATION_LIMIT));
if (isExpired) {
store.delete(keyValue.key);
}
}
});
});
}
You haven't provided any details on whats the date value you are parsing. Also, did you see the log message of date parsing failed? A little more detail would surely help.
Based on the code snippet you shared, I assume the
sentAt variable is being set as nonNull
due to exception or whatever reason andisExpired as false
.Therefore you do not see any deletion happening. Put debug points to figure out the object flow in your code.
It's the problem in your java code not on Kafka state store.