• We have a topic “customer_events“ in Kafka. Example of value.
{
"CUSTOMERID": "198fa518-1031-4fe8-8abd-ca29bd120259"
}
• We created a persistent stream over the topic in ksqlDB cluster in Confluent.
CREATE STREAM TEST_STREAM
(SESSIONID STRING KEY, CUSTOMERID STRING) WITH
(KAFKA_TOPIC='customer_events', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');
• We created derived table over the stream in ksqlDB cluster in Confluent. The table aggregates customers according to SessionId.
CREATE TABLE QUERYABLE_TESTTABLE AS SELECT
SRC.SESSIONID SESSIONID,
COLLECT_LIST(SRC.CUSTOMERID) CUSTOMERS
FROM TEST_STREAM SRC
GROUP BY SRC.SESSIONID
EMIT CHANGES;
• We then query the table (pull query):
SELECT * from QUERYABLE_TESTTABLE ;
• The whole flow works fine (INSERT and UPDATE). The results are like expected.
SessionId | customers |
---|---|
"3e45e7ac-781b-4213-b288-b3f95836487c" | [ "198fa518-1031-4fe8-8abd-ca29bd120259", "bb1494de-bc1a-429b-a2b0-68684ed01d17"] |
"88db0272-db35-48e9-b7ec-b326a9cde106" | [ "bc4ab46c-5e79-4ca6-af67-74688105a5c0"] |
... | ... |
But how to remove the items from the QUERYABLE_TESTTABLE table?
We tried to insert tombstone into customer_events topic. We tried to insert tombstone into underlying topic of the QUERYABLE_TESTTABLE table, which I know is not the best idea. We search the internet, there is no clear description how to do it.
You are using STREAM which doesn't read tombstone (value as null) event in the 1st place. Solution is more of a re-design. I can't think of any other solution to this problem.
If you have control over what you are publishing, instead of publishing tombstone event to customer_events table.
__deleted
.false
if you want to delete the key just make ittrue
.WHERE __deleted != 'true'
.Just add a having clause at the end to check array size greater than 0 to create a tombstone even.
ARRAY_LENGTH(COLLECT_LIST(SRC.CUSTOMERID)) > 0
Note: You can easily do this even if your source connector is debezium. It provides out of the box class to do it