Watermark strategy on Flink Kinesis Consumer

551 Views Asked by At

I am investigating using Flink with a Kinesis stream as a source. I would like to use Event Time watermarking. Planning on running this on AWS managed Flink (Kinesis Analytics) platform.

Looking at the AWS documentation and indeed Flink documentation it is recommended to use the FlinkKinesisConsumer.
To enable EventTime on this consumer I see that the recommendation is to use a custom AssignerWithPeriodicWatermarks() and set it on the KinesisConsumer with setPeriodicWatermarkAssigner.

However, I also read on the Flink documentation that this API is deprecated and it advised to use WatermarkStrategies.

My questions:

  • is it possible to use the WatermarkStrategy on the kinesis consumer or must it be applied after a non-source operation on the DataStream itself (discouraged in flink docs)?
  • if not possible and must be used after a non-source operation what does this mean? Why is it discouraged? how does it will performance of the workload
  • Or is it recommended to continue to use a deprecated API?
  • or is there another kinesis flink consumer than can be recommended

Thanks in advance for any suggestions

Alexis

1

There are 1 best solutions below

0
On

It's not possible to set it directly on FlinkKinesisConsumer (excepting, as you pointed out, by using the deprecated AssignerWithPeriodicWatermarks interface) nor any other implementation of SourceFunction, but you can set the watermarks as soon as you get the datastream.

Judging from the comments I think you've already figured it out, but this is what I'm doing.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

// Set up
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var consumerProperties = ...;
var deserializer = ...;
var kinesisConsumer = new FlinkKinesisConsumer<MyDataType>(kinesisStreamName, deserializer, consumerProperties);

// Pick a watermark strategy.
// Use some timestamp field from your event object.
var strategy = WatermarkStrategy.forMonotonousTimestamps<MyDataType>()
                                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Start reading from Kinesis
var dataStream = env.addSource(kinesisConsumer)
                    .assignTimestampsAndWatermarks(strategy);

You'll have to tweak this to your use case. Obviously replace MyDataType with whatever is actually in your stream, but also note that forMonotonousTimestamps may not be suited to your use case, and forBoundedOutOfOrderness may work better.