kSQL - grouping a table by timestamp as a string per minute

435 Views Asked by At

I have a question regarding kSQL. I am quite new to kSQL but with some experience in using MS SQL Server. Hope to get some help here, because I do not understand why things happen in KSQL as they happen currently. Just for info: We are using kSQLDB in Confluent (Apache Kafka) Cloud.

Here we go with my use case: My team and me are using some cryptocurrency data (price, marketshare, percent change etc.) using an open source API and we want to aggregate the data (especially the price data and some others) per minute.

The message from the API contains a timestamp (BIGINT format) in milliseconds and we changed the timestamp to string format within our streams using KSQL.

Within the last step of our cryptocurrency datas stream, we want to group the avg(priceusd) by timestamp (string) to get the results of the average price per each timestamp (shown as date and time per minute). But the table in our kSQLDB always produces more than one row per timestamp (which is in the group by clause)to the table. This especially happens when setting the auto.offset.reset to “latest” while streaming live data. Using auto.offset.reset “earliest” to load old data from the underlying topic works fine (one entry per one timestamp as it should be via group by).

Here we go with the code for the final table including the group by clause:

CREATE TABLE COINCAP_Table WITH (KAFKA_TOPIC=‘Coincap_Table’, KEY_FORMAT=‘JSON’, PARTITIONS=1, REPLICAS=3, VALUE_FORMAT=‘JSON’) AS SELECT
data->symbol+’,’+TIMESTAMP_FORMATTED as TIMESTAMP_SYMBOL_KEY,
AVG(data->priceusd) as AVG_priceusd,
AVG(data->volumeusd24hr) as AVG_volumeusd24hr,
AVG(data-> CHANGEPERCENT24HR) as AVG_CHANGEPERCENT24HR,
AVG(data->marketcapusd) as AVG_marketCapUsd
FROM COINCAP_STREAM2
GROUP BY data->symbol+’,’+TIMESTAMP_FORMATTED
EMIT CHANGES;

PS: We have combined to columns to one (symbol and timestamp) to use the combined attribute for a join later. But that’s not the point.

The TIMEStAMP_FORMATTED was changed from BIGINT (in milliseconds) to STRING as follows:

TIMESTAMPTOSTRING(TIMESTAMP, ‘yyyy-MM-dd ‘‘at’’ HH:mm’) as TIMESTAMP_FORMATTED,

Does anyone know a solution to solve this issue and only get a single row per each key in the group by clause? Why does kSQL produce more than one row (sometimes 2 or 3) for each key attribute in the group by clause?

Thanks for your help.

Best, Sebastian

0

There are 0 best solutions below