ksqlDB deleting records from KTable

187 Views Asked by At

• We have a topic “customer_events“ in Kafka. Example of value.

{ 
  "CUSTOMERID": "198fa518-1031-4fe8-8abd-ca29bd120259"
}

• We created a persistent stream over the topic in ksqlDB cluster in Confluent.

CREATE STREAM TEST_STREAM 
(SESSIONID STRING KEY, CUSTOMERID STRING) WITH 
(KAFKA_TOPIC='customer_events', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');

• We created derived table over the stream in ksqlDB cluster in Confluent. The table aggregates customers according to SessionId.

CREATE TABLE QUERYABLE_TESTTABLE AS SELECT
   SRC.SESSIONID SESSIONID,
   COLLECT_LIST(SRC.CUSTOMERID) CUSTOMERS
FROM TEST_STREAM SRC
GROUP BY SRC.SESSIONID
EMIT CHANGES;

• We then query the table (pull query):

SELECT * from  QUERYABLE_TESTTABLE ;

• The whole flow works fine (INSERT and UPDATE). The results are like expected.

SessionId customers
"3e45e7ac-781b-4213-b288-b3f95836487c" [ "198fa518-1031-4fe8-8abd-ca29bd120259", "bb1494de-bc1a-429b-a2b0-68684ed01d17"]
"88db0272-db35-48e9-b7ec-b326a9cde106" [ "bc4ab46c-5e79-4ca6-af67-74688105a5c0"]
... ...

But how to remove the items from the QUERYABLE_TESTTABLE table?

We tried to insert tombstone into customer_events topic. We tried to insert tombstone into underlying topic of the QUERYABLE_TESTTABLE table, which I know is not the best idea. We search the internet, there is no clear description how to do it.

1

There are 1 best solutions below

0
On

You are using STREAM which doesn't read tombstone (value as null) event in the 1st place. Solution is more of a re-design. I can't think of any other solution to this problem.

If you have control over what you are publishing, instead of publishing tombstone event to customer_events table.

  • Add a new column. Column can be called __deleted.
  • By default populate false if you want to delete the key just make it true.
  • Add a simple where clause in your derived table WHERE __deleted != 'true'.
  • To also have tombstone at final table as well. Or else you will see empty array if all records for a given SESSIONID is deleted.
    Just add a having clause at the end to check array size greater than 0 to create a tombstone even. ARRAY_LENGTH(COLLECT_LIST(SRC.CUSTOMERID)) > 0

Note: You can easily do this even if your source connector is debezium. It provides out of the box class to do it