I am using s3 connector to read from kafka avro topic (with schema registry) and push to s3 in parquet format. It works fine when compression is none. But when I use gzip compression I get the following error:
{"source_host":"connector-avro-parquet-0","method":"put","level":"ERROR","ctx":{"stacktrace":"org.apache.kafka.connect.errors.RetriableException: org.apache.kafka.connect.errors.DataException: Multipart upload failed to complete.\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:524)\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitOnTimeIfNoData(TopicPartitionWriter.java:303)\n\tat io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:194)\n\tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base\/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base\/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Multipart upload failed to complete.\n\tat io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:169)\n\tat io.confluent.connect.s3.storage.S3ParquetOutputStream.close(S3ParquetOutputStream.java:36)\n\tat org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:865)\n\tat org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)\n\tat org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)\n\tat io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.commit(ParquetRecordWriterProvider.java:108)\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:544)\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:514)\n\t... 14 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Expected compressionFilter to be a DeflaterOutputStream, but was passed an instance that does not match that type.\n\tat io.confluent.connect.s3.storage.CompressionType$1.finalize(CompressionType.java:77)\n\tat io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:161)\n\t... 21 more","exception_class":"org.apache.kafka.connect.errors.RetriableException","exception_message":"org.apache.kafka.connect.errors.DataException: Multipart upload failed to complete."}
Same issue happens when I use Avro format class with compression
Connector config:
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"errors.log.include.messages": "true",
"s3.region": "us-east-1",
"topics.dir": "parquet-parsed-data",
"flush.size": "10000000",
"s3.part.size": "62428800",
"tasks.max": "10",
"timezone": "UTC",
"rotate.interval.ms": "-1",
"locale": "en-US",
"partition.fieldName": "day",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"format.bytearray.extension": "json",
"errors.log.enable": "true",
"s3.bucket.name": "data-lake-parquet",
"partition.duration.ms": "6000000",
"s3.ssea.name": "",
"schema.compatibility": "NONE",
"directory.delim": "/",
"batch.size": "1000",
"parquet.codec": "gzip",
"store.url": "http://127.0.0.1:9000",
"topics.regex": "input-topic-avro",
"partition.field.name": "timestamp",
"s3.compression.type": "gzip",
"partitioner.class": "custom.WriterPartitioner",
"name": "avro-parquet-writer",
"partition.fields": "timestamp:no_timestamp",
"errors.tolerance": "all",
"connector.buffer.dir": "/data/",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"path.format": "YYYY-MM-dd",
"rotate.schedule.interval.ms": "300000",
"timestamp.extractor": "RecordField",
"timestamp.field": "timestamp"
}
Anyway to resolve this?
Thanks in advance!
What I am trying and what I expect: Read avro serialized messages from kafka topic with schema registry enabled and push to aws s3 in compressed parquet format using confluent S3 Sink Connector. The connector is consuming messages from topic but failing with "Multipart upload failed" error. Was expecting to see compressed parquet files (*.parquet.gzip) in s3 bucket