I have a Kafka source that has a retention of 7 days, and we have a JDBC data store that contains all the data. The data are "trades" and each trade has a amount
I want to create a streaming job that keeps track of "total overall sum of amounts" (totalVolume) over the full history of data both in JDBC store + all new incoming event data via Kafka.
Currently I have 1 batch job that calculates the total sum once and stores in the database, and then a streaming job that picks where the batch job left off, relying on a field named lastVolumeTimestamp and continue adding new comin amounts. Is this the correct way of doing this or is there a more robust way of doing this in Flink SQL? I want to make sure no event is missed.
The batch job looks like this:
INSERT INTO
sink
SELECT
poolId,
SUM(amountUSD) as totalVolume,
MAX(timestamp) as totalVolumeTimestamp
FROM
trades_store -- JDBC backed
GROUP BY poolId
;
And the streaming job looks like this:
CREATE VIEW trades_agg AS
SELECT
ss.poolId,
SUM(ss.amount) as totalVolume,
MAX(ss.tradeTimestamp) as totalVolumeTimestamp,
COALESCE(LAST_VALUE(pl.volumeUSD), 0) as currentTotalVolume
FROM
trades_stream ss
LEFT JOIN
pools_store FOR SYSTEM_TIME AS OF ss.procTime AS pl
ON
ss.poolId = pl.poolId
WHERE
pl.totalVolumeTimestamp IS NULL OR
ss.tradeTimestamp > pl.totalVolumeTimestamp
GROUP BY ss.poolId, pl.poolId
;
INSERT INTO
pools_sink
SELECT
volumeUSD + currentVolumeUSD as totalVolumeUSD,
totalVolumeTimestamp
FROM
trades_agg
WHERE
totalVolumeTimestamp IS NOT NULL AND
totalVolumeTimestamp > 0
;
The most immediate issue is I have to stop the streaming job if I want to run the batch (e.g. to fix some old data).
But the main question is, is this the correct way of using Flink aggregation functions for such task?