Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema

1.2k Views Asked by At

I have a pipeline flow, where i connect the debezium CDC mysql connector from confluent platform to Confluent Cloud since the cloud inbuilt debezium mysql connector is in preview which i have successfully established the connection and the messages from the topic is subscribed by a S3 sink connector. Initially i had the flow in json format, but later i wanted this to be in AVRO format hence i changed the connector config file for key and value converters as shown below:

Debezium connector json:

{
    "name":"mysql_deb3",
    "config":{
       "connector.class":"io.debezium.connector.mysql.MySqlConnector",
       "tasks.max":"1",
       "database.hostname":"host_name",
       "database.port":"3306",
       "database.user":"user_name",
       "database.password":"password",
       "database.server.id":"123456789",
       "database.server.name": "server_name",
       "database.whitelist":"db_name",
       "database.history.kafka.topic":"dbhistory.db_name",
       "include.schema.changes": "true",
       "table.whitelist": "db_name.table_name",
       "tombstones.on.delete": "false",
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "value.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "key.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "value.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "decimal.handling.mode": "double",
       "transforms": "unwrap",
       "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
       "transforms.unwrap.drop.tombstones": "true",
       "transforms.unwrap.delete.handling.mode": "rewrite",
"database.history.kafka.bootstrap.servers":"confluent_cloud_kafka_server_endpoint:9092",
"database.history.consumer.security.protocol":"SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm":"https",
"database.history.consumer.sasl.mechanism":"PLAIN",
"database.history.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"cloud_kafka_api\" password=\"cloud_kafka_api_secret\";",
"database.history.producer.security.protocol":"SASL_SSL",
"database.history.producer.ssl.endpoint.identification.algorithm":"https",
"database.history.producer.sasl.mechanism":"PLAIN",
"database.history.producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"cloud_kafka_api\" password=\"cloud_kafka_api_secret\";",
    }
 }

####################################################################

connect-distributed.properties:

bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
request.timeout.ms=20000
retry.backoff.ms=500

producer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
producer.ssl.endpoint.identification.algorithm=https
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500

consumer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
consumer.ssl.endpoint.identification.algorithm=https
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500

offset.flush.interval.ms=10000
group.id=connect-cluster
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3

schema.registry.url=https://cloud_schema_registry_endpoint
schema.registry.basic.auth.user.info=<schema_registry_api_key>:<schema_registry_api_secret>

#################################################

-- I start up the kafka connect by --> bin/connect-distributed etc/connect-distributed.properties

-- The connect starts up good, but when i try to load the debezium connector using the curl command it show up the below error "unauthorized", but the api keys and secrets i have given is correct that i manually checked it using cli too.

Caused by: org.apache.kafka.connect.errors.DataException: staging-development-rds-cluster at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:78) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 11 more Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"SchemaChangeKey","namespace":"io.debezium.connector.mysql","fields":[{"name":"databaseName","type":"string"}],"connect.name":"io.debezium.connector.mysql.SchemaChangeKey"} Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:119) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:156) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:117) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:76) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:266) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-11-30 05:30:47,389] ERROR WorkerSourceTask{id=mysql_deb3-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178) [2020-11-30 05:30:47,389] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask:187) [2020-11-30 05:30:47,389] INFO Stopping MySQL connector task (io.debezium.connector.mysql.MySqlConnectorTask:458)

Please guys help me on this. Thanks in Advance

0

There are 0 best solutions below