I've a TABLE created from KSQL query and inut Stream that is backed by a Kafka Topic.
This topic is sink to s3 using Kafka Connect.
In the topic, I have around 1k msgs/sec.
The topic has 6 partitions and 3 replicas.
I have a strange output ratio. Sink seems to be strange.
Here is my monitoring :
monitoring
You can see the first chart shows Input ratio B/s, the second Out ratio and the third the lag computed using Burrow.
Here is my s3-sink properties file :
{
"name": "sink-feature-static",
"config": {
"topics": "FEATURE_APP_STATIC",
"topics.dir": "users-features-stream",
"tasks.max": "6",
"consumer.override.auto.offset.reset": "latest",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\'part_date\''=YYYY-MM-dd/'\'part_hour\''=HH",
"partition.duration.ms": "3600000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://cp-schema-registry.schema-registry.svc.cluster.local:8081",
"flush.size": 1000000,
"s3.part.size": 5242880,
"rotate.interval.ms": "600000",
"rotate.schedule.interval.ms": "600000",
"locale": "fr-FR",
"timezone": "UTC",
"timestamp.extractor": "Record",
"schema.compatibility": "NONE",
"aws.secret.access.key": "secretkey",
"aws.access.key.id": "accesskey",
"s3.bucket.name": "feature-store-prod-kafka-test",
"s3.region": "eu-west-1"
}
}
Here is what I'm observing in s3 bucket : s3 bucket In these files I have small amount of messages in parquet.snappy. (Sometimes only 1 sometimes more, ...). Around 2 files per seconds per partition. (As I'm using Record timestamp, it's because it's catching up the lag I think).
What I was expecting is :
File commit every 1000000 messages (flush.size) or every 10 minutes (rotate.schedule.interval.ms).
So I'm expecting (As 1M messages > 10min * 1Kmsg/s):
1/ 6 (every 10min) * 6 (nb of partitions) parquet files every hour
2/ Or If I was wrong, At least files with 1M messages inside ...
But neither 1/ or 2/ is observed ...
And I have a huge lag and a flush/commit in s3 file every hour (see monitoring).
Does "partition.duration.ms": "3600000" leads to that observation ?
Where am I wrong ?
Why I do not see a continuous Output flush of data but such spikes ?
Thanks ! Rémy
so yes first set
partition.duration.ms
to 10 minutes if you want one s3 object per 10 minutes. Second, if you really don't want small files setrotate.interval.ms=-1
androtate.schedule.interval.ms
to 10 minutes (however you loose guarantee of exactly once delivery).When using
rotate.interval.ms
, what happens is each time you receive a timestamp earlier than the file offset, kafka-connect flushes leading to very small files at each beginning and end of the hour, it does ensure exactly once delivery in all failures cases.