Is it possible to use KafkaStreams TimeWindows to detect timeouts and expiries? Something that can be used in reservation/booking services. You reserve a table, then you have to pay for it fast enough. Otherwise you miss the reservation and you need to try again. I want to send an event for the initial reservation, open a window on it and then get a state update when the window is closed. The length of the window is the deadline to pay. If a payment happens, another event will be published which updates the state of the window. So at the end, I want a callback that happens when the window is closed and the latest state of the window at that point.
I have a POC implemented here. Also described my idea in more details here.
And here is the summarised version of the code (where I probably went wrong).
streamsBuilder.stream(Topics.CUSTOMER_EVENTS_TOPIC, EVENT_CONSUMED)
.groupByKey(Grouped.with(Serdes.String(), EVENT_JSON_SERDE))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(15), Duration.ofSeconds(1)))
.aggregate(TableReservation::createTableReservation,
(key, event, currentTableReservation) -> switch (event) {
case CustomerRequestedTable customerRequestedTable -> currentTableReservation.withTableId(customerRequestedTable.tableId())
.withCustomerId(key)
.withStatus(RESERVED_AWAITING_PAYMENT);
case CustomerPaidForTable ignored -> currentTableReservation.withStatus(PAID_FOR);
default -> currentTableReservation;
}
, RESERVATION_LOCAL_KTABLE_MATERIALIZED)
.suppress(Suppressed.untilWindowCloses(new StrictBufferConfigImpl()))
.toStream()
.foreach((key, tableReservation) -> {
log.info("final window state right before closure: {} ", tableReservation);
LocalTime startTime = LocalTime.ofInstant(key.window().startTime(), ZoneId.of("Europe/Stockholm"));
LocalTime endTime = LocalTime.ofInstant(key.window().endTime(), ZoneId.of("Europe/Stockholm"));
log.info("window length {}:{}", startTime, endTime);
});
I added suppress to only get one state update in the foreach block, only when each window gets closed. Unfortunately, what I get in the foreach block is kinda delayed.
And sometimes I don't get anything. It seems like I need to send another event (even with a different key) to the topic (which results in a new window), in order to get the result of the last window.
Also, when not using suppress, I'm not sure if I get all the state updates in the foreach, for each event.
So far I can detect that someone has paid in time. For that I get a state update in the foreach with status "PAID_FOR" which is terminal (even if I get it in the middle of the window).
But when I get "AWAITING_PAYMENT" in the foreach, I don't if it's terminal as I don't know if the window is actually closed.
Note: don't mind that strange switch. I'm using Java 21 preview features.