I'm working with data coming from kafka in real-time. As a result of processing a record from one line, several lines are formed (due to the JOIN and EXPLODE operations). Then I need to select among all these rows only one, with the highest value of a certain field.
The question is how to collect all these rows obtained from one source record into a window and apply an aggregate function to them. In Spark Structured Streaming I managed to solve a similar problem using foreachBatch, but I'm stuck in Flink.
The closest to the desired result was obtained using Top-N, but it is not suitable for append-mode and I'm not entirely sure how to store it in the target storage. Am I even going in the right direction?
I'm new to Flink, all answers are appreciated.
Update:
OK, I tried using session windows with an aggregate function to find the maximum:
DataStream<Row> joined = tableEnv.toDataStream(tableEnv.from("joined_msip"));
DataStream<Row> filtered = joined
.keyBy((KeySelector<Row, Object>) value -> value.getField("unique_id"))
.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(1000)))
.aggregate(new MaxStartTimeAggregate());
And it works, but I am forced to set a session gap, and this results in a loss of time compared to just collecting all the records spawned by the original record.
Well, this does not look like the best solution, so the question remains.