Here is my current scenario:
- a Kafka source datastream
- convert source datastream into a Table with primary key, rowtime and watermark columns
tableEnvironment.fromDataStream(
sourceKafkaDataStream,
Schema,newBuilder()
.primaryKey("id")
.columnByExpression("proctime", "PROCTIME()")
.columnByExpression("rowtime", "TO_TIMESTAMP_LTZ(eventtime, 3)")
.watermark("rowtime", "rowtime - INTERVAL '60' SECOND")
.build()
)
- create multiple temporary views that extract over aggregations (distinct count) on different columns
select
id,
rowtime,
key1,
count(*) over last1hour AS cnt,
count(distinct category) over last_hour as distinct_categories
from sourceKafkaTable
window last_hour AS (
PARTITION BY key1 -- different key
ORDER BY rowtime ASC
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
select
id,
rowtime,
key2,
count(*) over last1hour AS cnt,
count(distinct category) over last_hour as distinct_categories
from sourceKafkaTable
where productId is not null
window last_hour AS (
PARTITION BY key2 -- different key
ORDER BY rowtime ASC
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
tableEnvironment.createTemporaryView("Table2", categoryOverAggregation);
tableEnvironment.createTemporaryView("Table3", productOverAggregation);
- left join over aggregation queries to merge all columns per row and sink to hdfs
select A.id, B.key1, C.key2, B.distinct_categories, C.distinct_categories
from sourceTable A
left join Table2 FOR SYSTEM_TIME AS OF A.rowtime AS B ON A.id = B.id
left join Table3 FOR SYSTEM_TIME AS OF A.rowtime AS C ON A.id = C.id
however, following error is thrown
Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:
is this the right way to join over aggregation queries from a single source table? if so, how can I define primary keys on temporary views?