Using KafkaStreams TimeWindows for detecting timeout and expiry

49 Views Asked by At

Is it possible to use KafkaStreams TimeWindows to detect timeouts and expiries? Something that can be used in reservation/booking services. You reserve a table, then you have to pay for it fast enough. Otherwise you miss the reservation and you need to try again. I want to send an event for the initial reservation, open a window on it and then get a state update when the window is closed. The length of the window is the deadline to pay. If a payment happens, another event will be published which updates the state of the window. So at the end, I want a callback that happens when the window is closed and the latest state of the window at that point.

I have a POC implemented here. Also described my idea in more details here.

And here is the summarised version of the code (where I probably went wrong).

        streamsBuilder.stream(Topics.CUSTOMER_EVENTS_TOPIC, EVENT_CONSUMED)
            .groupByKey(Grouped.with(Serdes.String(), EVENT_JSON_SERDE))
            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(15), Duration.ofSeconds(1)))
            .aggregate(TableReservation::createTableReservation,
                    
                    (key, event, currentTableReservation) -> switch (event) {
                        case CustomerRequestedTable customerRequestedTable -> currentTableReservation.withTableId(customerRequestedTable.tableId())
                                        .withCustomerId(key)
                                        .withStatus(RESERVED_AWAITING_PAYMENT);
                        case CustomerPaidForTable ignored -> currentTableReservation.withStatus(PAID_FOR);
                        default -> currentTableReservation;
                    }
                    
                    , RESERVATION_LOCAL_KTABLE_MATERIALIZED)
            .suppress(Suppressed.untilWindowCloses(new StrictBufferConfigImpl()))
            .toStream()
            .foreach((key, tableReservation) -> {
                log.info("final window state right before closure: {} ", tableReservation);
                LocalTime startTime = LocalTime.ofInstant(key.window().startTime(), ZoneId.of("Europe/Stockholm"));
                LocalTime endTime = LocalTime.ofInstant(key.window().endTime(), ZoneId.of("Europe/Stockholm"));
                log.info("window length {}:{}", startTime, endTime);
            });

I added suppress to only get one state update in the foreach block, only when each window gets closed. Unfortunately, what I get in the foreach block is kinda delayed. And sometimes I don't get anything. It seems like I need to send another event (even with a different key) to the topic (which results in a new window), in order to get the result of the last window.

Also, when not using suppress, I'm not sure if I get all the state updates in the foreach, for each event.

So far I can detect that someone has paid in time. For that I get a state update in the foreach with status "PAID_FOR" which is terminal (even if I get it in the middle of the window).

But when I get "AWAITING_PAYMENT" in the foreach, I don't if it's terminal as I don't know if the window is actually closed.

Note: don't mind that strange switch. I'm using Java 21 preview features.

0

There are 0 best solutions below