KSQLDB Window Aggregation

380 Views Asked by At

i am trying to window aggregation on a stream on ConfluentCloud. But i couldn't get expected result. Table is acting like changelog.

I have a topic named "sessions", i created a stream based on "sessions" topic.

Stream script:

CREATE OR REPLACE STREAM sessions_stream (
    requestId VARCHAR,
    type VARCHAR,
    custId VARCHAR,
    channelCode VARCHAR
  ) WITH (
    KAFKA_TOPIC = 'sessions',
    VALUE_FORMAT = 'JSON'
  );
 

then i created a table with tumbling window.

Table script:

CREATE OR REPLACE TABLE agg_sessions 
AS SELECT
  REQUESTID REQUESTID,
  LATEST_BY_OFFSET(CUSTID) CUSTID,
  LATEST_BY_OFFSET(CHANNELCODE) CHANNELCODE,
  COLLECT_LIST(TYPE, ',') TYPES
FROM sessions_stream
WINDOW TUMBLING (SIZE 1 MINUTES) 
GROUP BY agg_sessions.REQUESTID
EMIT CHANGES;

These are my produced records to sessions topic;

{"requestId": "232", "type": "trial1", "custId": "1234", "channelCode": "branch1"}
{"requestId": "232", "type": "trial2", "custId": "1234", "channelCode": "branch1"}
{"requestId": "232", "type": "trial3", "custId": "1234", "channelCode": "branch1"}
{"requestId": "232", "type": "trial4", "custId": "1234", "channelCode": "branch1"}

I am getting these results from agg_sessions in order:

 { "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1," ]}
 { "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2," ]}
 { "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2,","trial3," ]}
 { "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2,","trial3,","trial4," ]}

But, I want a single record per REQUESTID on output table/topic "agg_sessions ". Just this :

{ "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2,","trial3,","trial4," ]}

How can i do it? Can anyone help me? Can you show different perspective for solution? Thanks for your answers.

1

There are 1 best solutions below

0
On
CREATE OR REPLACE TABLE agg_sessions 
AS SELECT
   requestId,
   LATEST_BY_OFFSET(custid) CUSTID,
   LATEST_BY_OFFSET(channelCode) CHANNELCODE,
   COLLECT_LIST(type) TYPES
FROM sessions_stream
WINDOW TUMBLING (SIZE 1 MINUTES) 
GROUP BY requestId
EMIT CHANGES; 

enter image description here