Heavy back pressure and huge checkpoint size

558 Views Asked by At

I have an Apache Flink application that I have deployed on Kinesis Data analytics.

Payload schema processed by the application (simplified version):

{
id:String= uuid (each request gets one),
category:string= uuid (we have 10 of these),
org_id:String = uuid (we have 1000 of these),
count:Integer (some integer)
}

This application is doing the following:

  1. Source: Consume from a single Kafka topic (128 partitions)
  2. Filter: Do some filtering for invalid records (nothing fancy here)
  3. key-by: based on 2 fields in the input Tupe.of(org_id,category) .
  4. Flatmap(de-duplication): Maintains a guava cache(with size 30k and expiration 5 mins). A single String ID (id in payload) field is stored in the cache. Each time a record comes in, we check if the id is present in the cache. If it is present it will be skipped. Else it will be Skipped.
  5. Rebalance: Just to make sure some sinks don't remain idle while the others are taking all the load.
  6. Sink: Writes to S3 (and this S3 has versioning enabled).

This is deployed with: in KDA terms: parallelism of 64 and parallelism per KPU of 2. That means we will have a cluster of 32 nodes and each node has 1 core CPU and 4GB of RAM.

All of these below mentioned issues happen at 2000 rps.

Now to the issue I am facing:

  1. My lastCheckPointSize seems to 471MB. This seems to be very high given that we are not using any state (note: the guava cache is not stored on the state: Gist with just the interesting parts).

enter image description here

  1. I see heavy back pressure. Because of this the record_lag_max builds up. enter image description here

enter image description here

I am unable to understand why my checkpoint size so huge since I am not using any state. I was thinking, it will just be the kafka offsets processed by each of these stages. But 471MB seems too big for that.

?

Is this huge checkpoint responsible for the backpressure I am facing? When I look at s3 metrics it looks like 20ms per write, which I assume is not too much. enter image description here

I am seeing a few rate limits on S3, but from my understanding this seems to pretty low compared to the number of writes I am making. enter image description here

Any idea why I am facing this backpressure and also why my checkpoints are so huge?

(Edit added as an after thought)Now that I think about it, is it possible for that not marking LoaderCache as `transient’ in my DeduplicatingFlatmap playing any role in the huge checkpoint size?

Edit 2: Adding details related to my sink:

I am using a StreamingFileSink:

StreamingFileSink
  .forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
  .withBucketAssigner(bucketAssigner)
  .withRollingPolicy(DefaultRollingPolicy.builder()
                .withRolloverInterval(60000)
                .build())
  .build()

The JsonEncoder takes the object and converts it to json and writes out bytes like this: https://gist.github.com/vmohanan1/3ba3feeb6f22a5e34f9ac9bce20ca7bf

The BucketAssigner gets the product and org from the schema and appends them with the processing time from context like this: https://gist.github.com/vmohanan1/8d443a419cfeb4cb1a4284ecec48fe73

0

There are 0 best solutions below