MongoSinkConnector expect to find a schema subject that doesn't exist

321 Views Asked by At

I'm trying to use the connect MongoDB kafka connector sink (mongodb-kafka-connect-mongodb-1.7.0) to write avro event from kafka to MongoDB.

I have a schema registry set up that works with the kafka consumer example or my custom one, they are I both able to deserialize the event and print them.

On the other hand when I run the connector I get the following exception:

Subject '<my-avro-schema-name>-value' not found.; error code: 40401

The higher level stack trace message are :

  • Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 11

  • Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic client-order-request to Avro:

And indeed this subject or this id doesn't exist in the schema-registry, actually the highest id I have is 10 and I do have a subject named <my-avro-schema-name>-key.

Why is the MongoSinkConnector trying find a subject that doesn't exist ?

Connect properties:

bootstrap.servers=<value>
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/git/1.libraries/kafka_2.12-2.2.0/plugins

MongoSink properties:

name=<my-avro-schema-name>-sink
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=mongodb://<value>
database=Test
collection=test
topics=test
key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://<address>

schema-registry.properties:

listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=<address>
kafkastore.topic=_schemas
debug=false
auto.register.schemas=false
use.latest.version=true

Kafka producer configuraton:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-address>");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro  Producer");
props.put("schema.registry.url", "<schema-registry>");

KafkaProducer<String, AllEventsUnion> producerRequest = new KafkaProducer<>(props);

AllEventsUnion clientOrderRequest = createClientOrderRequest();
            
final ProducerRecord<String, AllEventsUnion> producerOrderRequest = new ProducerRecord<>("all-events-union",
                    "ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);

The AllEventsUnion is a union avro schema of multiple types. I'm using it to send different event type to the same kafka topic, which is why I thought I needed to register it before hand. But apparently you don't need to register schema before using them in the schema registry ?

1

There are 1 best solutions below

11
On

I do have a subject named <my-avro-schema-name>-key

This would indicate you have Avro keys.

Then why use key.converter=org.apache.kafka.connect.storage.StringConverter rather than AvroConverter?

indeed this subject or this id doesn't exist in the schema-registry

Then your Avro producer, upstream from the connector (if exists), is having a problem and has not registered the schema/subject or actually written Avro data using the schema registry.

Why is the MongoSinkConnector trying find a subject that doesn't exist ?

Because you've set this, and the data in the topic contains the ID to some non-existing subject.

value.converter=io.confluent.connect.avro.AvroConverter

Registering a schema after the data is produced will not modify data that's already in the topic you're consuming with other Avro information