ksqldb is skipping records due to expiration period when windowing them

110 Views Asked by At

I performed the bellow test to try to figure out how ksqldb will group by records from topic with 3 partitions. Is there any parameter that tells to ksqldb to make the windowing by partitions to avoid losing data due to expiration period ?

CREATE STREAM ratings (title VARCHAR, release_year INT, rating DOUBLE, timestamp VARCHAR)
    WITH (kafka_topic='ratings',
          timestamp='timestamp',
          timestamp_format='yyyy-MM-dd HH:mm:ss',
          partitions=3,
          value_format='json');

INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('AsjPxJDrFr', 1988, 9.2, '2023-05-18 15:35:01');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('sBaOVCofGF', 1975, 9.2, '2023-05-18 15:35:02');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('aOhNlSgVqW', 2010, 9.2, '2023-05-18 15:35:03');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('dKeObIdQAm', 2005, 8.9, '2023-05-18 15:35:04');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('tFMgeaJAvP', 1992, 8.9, '2023-05-18 15:35:05');

INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('LZGnwpXOHv', 1978, 6.8, '2023-05-19 15:35:06');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('CzKbTdHAsO', 2008, 6.8, '2023-05-19 15:35:07');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('zbeSPRfGMO', 2015, 5.9, '2023-05-19 15:35:08');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('WZaKjixEMn', 1984, 5.9, '2023-05-19 15:35:09');

INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('MhQdtsCvxR', 2001, 6.8, '2023-05-20 15:35:10');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('kXjvNTmOpD', 1999, 6.8, '2023-05-20 15:35:11');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('GfBuHqXmKr', 2003, 6.8, '2023-05-20 15:35:12');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('oKRPfYhXJL', 1987, 6.8, '2023-05-20 15:35:13');

INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('zVHjCbrkYD', 2012, 8.8, '2023-05-21 15:35:14');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('mALOfdzBwQ', 1972, 8.8, '2023-05-21 15:35:15');
INSERT INTO ratings (title, release_year, rating, timestamp) VALUES ('eRcIZVWnSa', 2007, 6.5, '2023-05-21 15:35:16');

SET 'auto.offset.reset' = 'earliest';

SELECT rating,
       COUNT(*) AS rating_count,
       TIMESTAMPTOSTRING(WINDOWSTART, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
       TIMESTAMPTOSTRING(WINDOWEND, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM ratings
WINDOW TUMBLING (SIZE 1 DAY, RETENTION 30 DAYS)
GROUP BY rating
EMIT CHANGES;

+-------------------------------------------+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|RATING                                     |RATING_COUNT                               |WINDOW_START                               |WINDOW_END                                 |
+-------------------------------------------+-------------------------------------------+-------------------------------------------+-------------------------------------------+
|8.8                                        |1                                          |2023-05-21 00:00:00                        |2023-05-22 00:00:00                        |
|8.8                                        |2                                          |2023-05-21 00:00:00                        |2023-05-22 00:00:00                        |
|9.2                                        |1                                          |2023-05-18 00:00:00                        |2023-05-19 00:00:00                        |
|6.8                                        |1                                          |2023-05-19 00:00:00                        |2023-05-20 00:00:00                        |
|6.8                                        |1                                          |2023-05-20 00:00:00                        |2023-05-21 00:00:00                        |
|6.5                                        |1                                          |2023-05-21 00:00:00                        |2023-05-22 00:00:00                        |
|5.9                                        |1                                          |2023-05-19 00:00:00                        |2023-05-20 00:00:00                        |
|5.9                                        |2                                          |2023-05-19 00:00:00                        |2023-05-20 00:00:00                        |

Skipped records:

ksqldb-server  | [2023-05-21 11:26:22,141] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[1] offset=[2] timestamp=[1684424104000] window=[1684368000000,1684454400000) expiration=[1684683315000] streamTime=[1684683315000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server  | [2023-05-21 11:26:22,206] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[4] timestamp=[1684424101000] window=[1684368000000,1684454400000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server  | [2023-05-21 11:26:22,208] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[5] timestamp=[1684424102000] window=[1684368000000,1684454400000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server  | [2023-05-21 11:26:22,209] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[6] timestamp=[1684596910000] window=[1684540800000,1684627200000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server  | [2023-05-21 11:26:22,212] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[7] timestamp=[1684596911000] window=[1684540800000,1684627200000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server  | [2023-05-21 11:26:22,218] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[8] timestamp=[1684596913000] window=[1684540800000,1684627200000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server  | [2023-05-21 11:26:22,306] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[2] offset=[9] timestamp=[1684510506000] window=[1684454400000,1684540800000) expiration=[1684683316000] streamTime=[1684683316000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
ksqldb-server  | [2023-05-21 11:26:22,310] WARN Skipping record for expired window. topic=[_confluent-ksql-default_transient_transient_RATINGS_9159540547105289197_1684668380893-Aggregate-GroupBy-repartition] partition=[1] offset=[3] timestamp=[1684424105000] window=[1684368000000,1684454400000) expiration=[1684683315000] streamTime=[1684683315000] (org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate:142)
0

There are 0 best solutions below