I'm developing a Springboot application that uses Spring-kafka to implement a Kafka Producer to sent messages to a Kafka cluster.
In the first iteration we've deployed the Spring application in a AKS cluster. The application connects to a "Kafka Cluster" based on Azure Event Hub.
This part worked well. We achieve the integration and we were able to send messages to a topic an consume the using a JSON format.
In the next iteration we want to introduce AVRO serializers and deserializers + a Schema Registry (with Azure Schema Registry). We did the changes in the code and we tested the changes successfuly with test containers. However, when we try to execute the application in the Azure environment is failing.
You have an example of the application here: https://github.com/alvNa/spring-kafka-demo
Here it is a chunk of the configuration:
spring:
kafka:
bootstrap-servers: <MY-NAMESPACE>.servicebus.windows.net:9093
security:
protocol: SASL_SSL
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
group-id: kafka-example-application
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<MY-NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<MY-KEYNAME>;SharedAccessKey=<MY-ACCESS-KEY>";
schema.registry.url: https://<MY-NAMESPACE>.servicebus.windows.net
schema.group: <MY-SCHEMA-GROUP>
specific.avro.reader: true
auto.register.schemas: false
use.latest.version: true
I have a basic avro schema for the example:
{
"namespace": "com.atradius.examples",
"type": "record",
"name": "Message",
"version": "1",
"fields": [
{
"name": "number",
"type": "int"
},
{
"name": "description",
"type": "string"
}
]
}
With the Avro Maven/Gradle plugin a Java Pojo is generated:
package com.atradius.examples;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@org.apache.avro.specific.AvroGenerated
public class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = -90117537915021226L;
...
}
Using this model I create a Kafka Producer using the Spring Kafka Template.
@Service
@RequiredArgsConstructor
public class KafkaAvroMessageService {
@Value("${example-app.kafka.producer.topic}")
private String topicName;
private final KafkaTemplate<String, Message> kafkaTemplate;
public void sendEvent(final String key, final String body) {
final Message message = new Message(key, body);
kafkaTemplate.send(topicName, key, message);
}
}
When I run the code and I try to send a message I have this error.
2024-02-08T17:19:18.807+01:00 ERROR 25180 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:750) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:674) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2790) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:908) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4912) ~[jackson-databind-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818) ~[jackson-databind-2.15.2.jar:2.15.2]
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3817) ~[jackson-databind-2.15.2.jar:2.15.2]
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:927) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:918) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:496) ~[kafka-schema-registry-client-7.3.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:216) ~[kafka-schema-serializer-7.3.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:200) ~[kafka-schema-serializer-7.3.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:133) ~[kafka-avro-serializer-7.3.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61) ~[kafka-avro-serializer-7.3.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-7.3.1-ccs.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1002) ~[kafka-clients-7.3.1-ccs.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949) ~[kafka-clients-7.3.1-ccs.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1016) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:783) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754) ~[spring-kafka-3.0.9.jar:3.0.9]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:544) ~[spring-kafka-3.0.9.jar:3.0.9]
at com.atradius.example.kafka.service.KafkaAvroMessageService.sendEvent(KafkaAvroMessageService.java:31) ~[main/:na]
at com.atradius.example.kafka.api.KafkaAvroExampleController.postNewEvent(KafkaAvroExampleController.java:38) ~[main/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1081) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:974) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011) ~[spring-webmvc-6.0.11.jar:6.0.11]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.0.11.jar:6.0.11]
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.11.jar:6.0]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.0.11.jar:6.0.11]
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.11.jar:6.0]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.0.11.jar:6.0.11]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.11.jar:6.0.11]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:166) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:341) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1740) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-10.1.11.jar:10.1.11]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
After debuging it I realised the problem is the Azure Schema Registry is not returning what I'm expecting. The REST call to retrieve the schema it is currently returning an XML.
curl --location 'https://<MY-NAMESPACE>.servicebus.windows.net/subjects/<MY-TOPIC>/versions/latest' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json'
Like this
<feed xmlns="http://www.w3.org/2005/Atom">
<title type="text">Publicly Listed Services</title>
<subtitle type="text">This is the list of publicly-listed services currently available.</subtitle>
<id>uuid:81b665e5-f924-4fdc-918e-346ccadd6fdb;id=90772</id>
<updated>2024-02-08T16:20:47Z</updated>
<generator>Service Bus 1.1</generator>
</feed>
Instead of returning a JSON schema like this.
{
"namespace": "com.atradius.examples",
"type": "record",
"name": "Message",
"version": "1",
"fields": [
{
"name": "number",
"type": "int"
},
{
"name": "description",
"type": "string"
}
]
}
If someone have experiece in the integration with Azure Schema Registry. Do you know what could be the reason of this behaviour?
Do I need some kind of configuration in the Azure Schema Registry ?
Thanks.