I’m quite new to Kafka and after successfully managing some HTTP connectors without schema (“value.converter.schemas.enable”: “false”), I’m struggling to use schemas.
I’ve added the schema (“schemaType”: “JSON”) of my JSON to the schema registry and then modified the configuration of my connector so that it uses it.
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "https://kafka-schemaregistry:8081/",
The problem is, I get the (in)famous “Unknown magic byte!”
ERROR Error encountered in task test-withschema-0. Executing stage ‘VALUE_CONVERTER’ with class ‘io.confluent.connect.json.JsonSchemaConverter’, where consumed record is {topic=‘mytopic’, partition=0, offset=34, timestamp=1708945734273, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Converting byte to Kafka Connect data failed due to serialization error of topic visitor:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:236)
Caused by: org.apache.kafka.common.errors.SerializationException: **Unknown magic byte**!
I’ve read plenty of posts (including the great How to Fix Unknown Magic Byte Errors in Apache Kafka ) but still struggling to fix it.
I’d have a couple of questions:
I manipulate the JSON in my Kafka topic in order to match what I expect at the other end of the connector via some transformations. The schema should validate the ***resulting ***JSON (after the transformations), is that correct? Or should validate the ***initial ***JSON (before the transformations)? I currently have only schema in my schema registry, but do I need to somehow tell me connector which schema it should use? If so, how do I do that?
Thanks for any help which can help me better troubleshoot my issue,
Here you can find the schema as I defined it on the schema registry:
curl -https://kafka-schemaregistry:8081/schemas/ | jq
[
{
"subject": "visitors",
"version": 1,
"id": 2,
"schemaType": "JSON",
"schema": "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Visitor",\"type\":\"object\",\"properties\":{\"BADGE_PRINT_ENTITY\":{\"type\":\"string\"},\"VISITOR_REGISTRATION_STATUS\":{\"type\":\"string\"},\"VISITOR_PERSON_ID\":{\"type\":\"string\"},\"VISIT_BUILDING\":{\"type\":\"string\"},\"VISITOR_DOC_EXPIRY_DATE\":{\"type\":\"string\"},\"VISITOR_EMAIL\":{\"type\":\"string\"},\"VISIT_SITE\":{\"type\":\"string\"},\"HOST_FIRST_NAME\":{\"type\":\"string\"},\"VISIT_NAME\":{\"type\":\"string\"},\"VISITOR_DOC_NUMBER\":{\"type\":\"string\"},\"VISITOR_APPROVAL_STATUS\":{\"type\":\"string\"},\"VISITOR_DOCUMENT_TYPE\":{\"type\":\"string\"},\"VISITOR_LAST_NAME\":{\"type\":\"string\"},\"VISITOR_NATIONALITY\":{\"type\":\"string\"},\"HOST_ID\":{\"type\":\"string\"},\"VISITOR_DATE_OF_BIRTH\":{\"type\":\"string\"},\"PERSONAL_NUMBER\":{\"type\":\"string\"},\"VISIT_TYPE\":{\"type\":\"string\"},\"BADGE_NUMBER\":{\"type\":\"string\"},\"VISITOR_BADGE_PRINT_STATUS\":{\"type\":\"string\"},\"VISITOR_SOCIAL_NAME\":{\"type\":\"string\"},\"HOST_LAST_NAME\":{\"type\":\"string\"},\"VISITOR_ORGANISATION\":{\"type\":\"string\"},\"VISIT_ID\":{\"type\":\"string\"},\"VIST_END_DATE\":{\"type\":\"string\"},\"VISITOR_FIRST_NAME\":{\"type\":\"string\"},\"HOST_GROUP\":{\"type\":\"string\"},\"BADGE_PRINT_DATE_AND_TIME\":{\"type\":\"string\"},\"HOST_EMAIL\":{\"type\":\"string\"},\"VISIT_ROOM\":{\"type\":\"string\"},\"VISIT_START_DATE\":{\"type\":\"string\"}},\"required\":[\"BADGE_PRINT_ENTITY\",\"VISITOR_REGISTRATION_STATUS\",\"VISITOR_PERSON_ID\",\"VISIT_BUILDING\",\"VISITOR_DOC_EXPIRY_DATE\",\"VISITOR_EMAIL\",\"VISIT_SITE\",\"HOST_FIRST_NAME\",\"VISIT_NAME\",\"VISITOR_DOC_NUMBER\",\"VISITOR_APPROVAL_STATUS\",\"VISITOR_DOCUMENT_TYPE\",\"VISITOR_LAST_NAME\",\"VISITOR_NATIONALITY\",\"HOST_ID\",\"VISITOR_DATE_OF_BIRTH\",\"PERSONAL_NUMBER\",\"VISIT_TYPE\",\"BADGE_NUMBER\",\"VISITOR_BADGE_PRINT_STATUS\",\"VISITOR_SOCIAL_NAME\",\"HOST_LAST_NAME\",\"VISITOR_ORGANISATION\",\"VISIT_ID\",\"VIST_END_DATE\",\"VISITOR_FIRST_NAME\",\"HOST_GROUP\",\"BADGE_PRINT_DATE_AND_TIME\",\"HOST_EMAIL\",\"VISIT_ROOM\",\"VISIT_START_DATE\"]}"
}
]
This is how I tell the connector to use a schema:
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "https://kafka-schemaregistry:8081/",
Any help would be appreciated :)