I have an Apache Flink application that I have deployed on Kinesis Data analytics.
Payload schema processed by the application (simplified version):
{
id:String= uuid (each request gets one),
category:string= uuid (we have 10 of these),
org_id:String = uuid (we have 1000 of these),
count:Integer (some integer)
}
This application is doing the following:
- Source: Consume from a single Kafka topic (128 partitions)
- Filter: Do some filtering for invalid records (nothing fancy here)
- key-by: based on 2 fields in the input
Tupe.of(org_id,category)
. - Flatmap(de-duplication): Maintains a guava cache(with size 30k and expiration 5 mins). A single String ID (
id
in payload) field is stored in the cache. Each time a record comes in, we check if the id is present in the cache. If it is present it will be skipped. Else it will be Skipped. - Rebalance: Just to make sure some sinks don't remain idle while the others are taking all the load.
- Sink: Writes to S3 (and this S3 has versioning enabled).
This is deployed with:
in KDA terms: parallelism
of 64 and parallelism per KPU
of 2.
That means we will have a cluster of 32 nodes and each node has 1 core CPU and 4GB of RAM.
All of these below mentioned issues happen at 2000 rps.
Now to the issue I am facing:
- My lastCheckPointSize seems to 471MB. This seems to be very high given that we are not using any state (note: the guava cache is not stored on the state: Gist with just the interesting parts).
I am unable to understand why my checkpoint size so huge since I am not using any state. I was thinking, it will just be the kafka offsets processed by each of these stages. But 471MB seems too big for that.
?
Is this huge checkpoint responsible for the backpressure I am facing? When I look at s3 metrics it looks like 20ms per write, which I assume is not too much.
I am seeing a few rate limits on S3, but from my understanding this seems to pretty low compared to the number of writes I am making.
Any idea why I am facing this backpressure and also why my checkpoints are so huge?
(Edit added as an after thought)Now that I think about it, is it possible for that not marking LoaderCache as `transient’ in my DeduplicatingFlatmap playing any role in the huge checkpoint size?
Edit 2: Adding details related to my sink:
I am using a StreamingFileSink:
StreamingFileSink
.forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(60000)
.build())
.build()
The JsonEncoder
takes the object and converts it to json and writes out bytes like this: https://gist.github.com/vmohanan1/3ba3feeb6f22a5e34f9ac9bce20ca7bf
The BucketAssigner gets the product and org from the schema and appends them with the processing time from context like this: https://gist.github.com/vmohanan1/8d443a419cfeb4cb1a4284ecec48fe73