Kafka Connect S3 Sink Flush data - Strange lag

2.1k Views Asked by At

I've a TABLE created from KSQL query and inut Stream that is backed by a Kafka Topic.
This topic is sink to s3 using Kafka Connect. In the topic, I have around 1k msgs/sec.
The topic has 6 partitions and 3 replicas.

I have a strange output ratio. Sink seems to be strange. Here is my monitoring : monitoring
You can see the first chart shows Input ratio B/s, the second Out ratio and the third the lag computed using Burrow.

Here is my s3-sink properties file :

{
"name": "sink-feature-static",
"config": {
  "topics": "FEATURE_APP_STATIC",
  "topics.dir": "users-features-stream",
  "tasks.max": "6",
  "consumer.override.auto.offset.reset": "latest",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'\'part_date\''=YYYY-MM-dd/'\'part_hour\''=HH",
  "partition.duration.ms": "3600000",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://cp-schema-registry.schema-registry.svc.cluster.local:8081",
  "flush.size": 1000000,
  "s3.part.size": 5242880,
  "rotate.interval.ms": "600000",
  "rotate.schedule.interval.ms": "600000",
  "locale": "fr-FR",
  "timezone": "UTC",
  "timestamp.extractor": "Record",
  "schema.compatibility": "NONE",
  "aws.secret.access.key": "secretkey",
  "aws.access.key.id": "accesskey",
  "s3.bucket.name": "feature-store-prod-kafka-test",
  "s3.region": "eu-west-1"
  }
}

Here is what I'm observing in s3 bucket : s3 bucket In these files I have small amount of messages in parquet.snappy. (Sometimes only 1 sometimes more, ...). Around 2 files per seconds per partition. (As I'm using Record timestamp, it's because it's catching up the lag I think).

What I was expecting is :
File commit every 1000000 messages (flush.size) or every 10 minutes (rotate.schedule.interval.ms).
So I'm expecting (As 1M messages > 10min * 1Kmsg/s):
1/ 6 (every 10min) * 6 (nb of partitions) parquet files every hour
2/ Or If I was wrong, At least files with 1M messages inside ...


But neither 1/ or 2/ is observed ...
And I have a huge lag and a flush/commit in s3 file every hour (see monitoring).
Does "partition.duration.ms": "3600000" leads to that observation ?


Where am I wrong ? Why I do not see a continuous Output flush of data but such spikes ?

Thanks ! Rémy

1

There are 1 best solutions below

0
On

so yes first set partition.duration.ms to 10 minutes if you want one s3 object per 10 minutes. Second, if you really don't want small files set rotate.interval.ms=-1 and rotate.schedule.interval.ms to 10 minutes (however you loose guarantee of exactly once delivery).

When using rotate.interval.ms, what happens is each time you receive a timestamp earlier than the file offset, kafka-connect flushes leading to very small files at each beginning and end of the hour, it does ensure exactly once delivery in all failures cases.