Avro schema must be a record

838 Views Asked by At

I'm trying to use S3SinkConnector with the following settings:

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "flush.size": 1,
  "s3.bucket.name": "*****",
  "s3.object.tagging": "true",
  "s3.region": "us-east-2",
  "aws.access.key.id": "*****",
  "aws.secret.access.key": "*****",
  "s3.part.retries": 5,
  "s3.retry.backoff.ms": 1000,
  "behavior.on.null.values": "ignore",
  "keys.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "headers.format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "store.kafka.headers": "true",
  "store.kafka.keys": "true",
  "topics": "***",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "topics.dir": "kafka-backup",
  "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",

  "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
  "locale": "en-US",
  "timezone": "UTC",
  "timestamp.extractor": "Record"

}

The records in Kafka are store in JSON format and saved there via io.confluent.connect.json.JsonSchemaConverter. So all messages has strict schema.

When sink connector trying to read records from Kafka - I got exception - "Avro schema must be a record." I didn't get why I got this error, cause I don't use any avro format.

The full stack trace:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:631)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Avro schema must be a record.
    at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:124)
    at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:150)
    at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)
    at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:182)
    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:563)
    at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:102)
    at io.confluent.connect.s3.format.S3RetriableRecordWriter.write(S3RetriableRecordWriter.java:46)
    at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:107)
    at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562)
    at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311)
    at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254)
    at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205)
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)

Version of S3connector - 10.3.0 Version of kafka-connect - 7.0.1

1

There are 1 best solutions below

10
On

You need to not use ParquetFormat, or you need to produce Avro. ParquetFormat requires Avro (source of S3 sink).