AWS MSK S3 Sink Connector to Deserialize AVRO without Schema Registry

81 Views Asked by At

The Kafka Topic has AVRO Flume Event. I understand that the event in the topic has both the Schema as well as Payload. I have provided below Kafka Connect Sink Properties

"connector_configuration": {
              "connector.class": "io.confluent.connect.s3.S3SinkConnector",
              "flush.size": "15",
              "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
              "key.converter": "org.apache.kafka.connect.storage.StringConverter",
              "locale": "en-US",
              "partition.duration.ms": "86400000",
              "partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
              "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
              "rotate.interval.ms": "900000",
              "s3.bucket.name": "s3-destination",
              "s3.region": "eu-west-2",
              "schema.compatibility": "NONE",
              "storage.class": "io.confluent.connect.s3.storage.S3Storage",
              "tasks.max": "1",
              "timestamp.extractor": "Record",
              "timezone": "UTC",
              "topics": "XXXXX",
              "topics.dir": "topic",
            },

The Output avro file also gets created but the problem is that the application is not able to parse the AVRO file and getting error. I checked the output avro file in S3 bucket and tried to parse myself and getting error.

It seems there is issue with the AVRO Schema file. PFB the screenshot of the first line of the file

AVRO FILE Details

As seen from the image, the avro.schema shows NULL and byte. I am using Confluent Kafka S3 Sink Connector and created AWS Kafka Plugin and then referred that plugin in AWS MSK Connect.

Can someone please help and let me know what exactly I am doing wrong ?

0

There are 0 best solutions below