I'm trying to use the connect MongoDB kafka connector sink (mongodb-kafka-connect-mongodb-1.7.0) to write avro event from kafka to MongoDB.
I have a schema registry set up that works with the kafka consumer example or my custom one, they are I both able to deserialize the event and print them.
On the other hand when I run the connector I get the following exception:
Subject '<my-avro-schema-name>-value' not found.; error code: 40401
The higher level stack trace message are :
-
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 11
-
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic client-order-request to Avro:
And indeed this subject or this id doesn't exist in the schema-registry, actually the highest id I have is 10 and I do have a subject named <my-avro-schema-name>-key
.
Why is the MongoSinkConnector trying find a subject that doesn't exist ?
Connect properties:
bootstrap.servers=<value>
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/git/1.libraries/kafka_2.12-2.2.0/plugins
MongoSink properties:
name=<my-avro-schema-name>-sink
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=mongodb://<value>
database=Test
collection=test
topics=test
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://<address>
schema-registry.properties:
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=<address>
kafkastore.topic=_schemas
debug=false
auto.register.schemas=false
use.latest.version=true
Kafka producer configuraton:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-address>");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");
props.put("schema.registry.url", "<schema-registry>");
KafkaProducer<String, AllEventsUnion> producerRequest = new KafkaProducer<>(props);
AllEventsUnion clientOrderRequest = createClientOrderRequest();
final ProducerRecord<String, AllEventsUnion> producerOrderRequest = new ProducerRecord<>("all-events-union",
"ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);
The AllEventsUnion is a union avro schema of multiple types. I'm using it to send different event type to the same kafka topic, which is why I thought I needed to register it before hand. But apparently you don't need to register schema before using them in the schema registry ?
This would indicate you have Avro keys.
Then why use
key.converter=org.apache.kafka.connect.storage.StringConverter
rather than AvroConverter?Then your Avro producer, upstream from the connector (if exists), is having a problem and has not registered the schema/subject or actually written Avro data using the schema registry.
Because you've set this, and the data in the topic contains the ID to some non-existing subject.
Registering a schema after the data is produced will not modify data that's already in the topic you're consuming with other Avro information