KSQLDB pull query doesn't return whole resultset

29 Views Asked by At

We have a very strange problem with pull queries in KsqlDB.

Setup

First of all, we have a set of 6 machines each containing its instance of KsqlDB.

Second, we have this setup:

listeners=http://0.0.0.0:8088
ksq.advertised_listener  is set for each node
ksql.heartbeat.enable=true
ksql.streams.num.standby.replicas=1
ksql.query.pull.enable.standby.reads=true
ksql.heartbeat.enable=true

We have a scenario very similar to what is described here. We have an input stream with 60 partitions. Let's call it events.

We declared a stream:

CREATE STREAM EVENTS (EVENT_TYPE STRING, TS STRING) WITH (CLEANUP_POLICY='delete', FORMAT='json', KAFKA_TOPIC='events', TIMESTAMP='ts', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX');
  1. We defined a table with aggregations like this:
CREATE TABLE EVENTS_HOURLY_COUNTS AS
SELECT EVENTS.ROW_PARTITION AS PARTITION,
       COUNT(*)
FROM EVENTS
WINDOW TUMBLING ( SIZE 1 HOURS )
GROUP EVENTS.BY ROW_PARTITION
EMIT CHANGES;

It created for us 3 topics, 1 visible and 2 hidden.

Kafka Topic                                                                                             | Partitions | Partition Replicas
-------------------------------------------------------------------------------------------------------------------------------------------
 EVENTS_HOURLY_COUNTS                                                                                      | 60         | 2
 _confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-Aggregate-Materialize-changelog        | 60         | 2
 _confluent-ksql-data_query_CTAS_EVENTS_HOURLY_COUNTS_209-Aggregate-GroupBy-repartition                    | 60         | 2

The problem

When we issue pull queries for this table it returns us sporadically inconsistent results without any errors in logs.

Our queries look like this:

SELECT WINDOWSTART, partition, event_count FROM events_hourly_counts  WHERE WINDOWSTART >= 1708452000000 AND WINDOWEND  <= 1708509600000

We run them against already closed periods so we expect that newly arrived data shouldn't interfere with it. We expect to get data from 60 partitions but sometimes (roughly 1 out of 10) it returns us less rows from 44 to 54 and sometimes even 61.

And as I said before the results are considered successful, there are no errors in log entries. If anyone could help somehow with such a case it would be great. Thanks in advance!

UPD:

I tried to create a table with only one partition:

CREATE TABLE EVENTS_HOURLY_COUNTS_2 WITH (PARTITIONS=1)
AS SELECT EVENTS.ROW_PARTITION AS PARTITION,
       COUNT(*)
FROM EVENTS
WINDOW TUMBLING ( SIZE 1 HOURS )
GROUP EVENTS.BY ROW_PARTITION
EMIT CHANGES;

So there is only one partition but it still collects the keys from 0 to 59. And it's the same behavior.

When I run pull query for 20 hours I expect to receive 20 * 60 = 1200 rows in resulting set. Most times it is 1200 rows but from time to time it could be 1199, 936 or even 1201 rows!

0

There are 0 best solutions below