KafkaJsonSchemaSerializer adds initial null bytes (00 00 00 00 0C 7B) to the record value

45 Views Asked by At

Using the following Spring-Boot properties

spring:
  profiles:
    active: "ssl"
  kafka:
    producer:
      client-id: ${SPRING_KAFKA_PRODUCER_CLIENT_ID:kafka-producer}
      bootstrap-servers: ${SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS:localhost:9092}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

the published JSON values starts with null bytes (00 00 00 00 0C 7B in Hex). This results in an invalid JSON string (e.g. when displayed in Offset-Explorer).

The @Configuration class I'm using is as simple as

@Configuration
@EnableAutoConfiguration
@RequiredArgsConstructor
@PropertySources({
        @PropertySource("classpath:kafka.yaml"),
        @PropertySource("classpath:kafka-ssl.yaml"),
})
public class KafkaProducerConfiguration {
    private final KafkaProperties kafkaProperties;

    @Bean("producer-factory")
    public <T> ProducerFactory<String, T> producerFactory() {
        return new DefaultKafkaProducerFactory<>(this.kafkaProperties.getProducer().buildProperties(null));
    }

    @Bean("kafka-template")
    public <T> KafkaTemplate<String, T> kafkaTemplate(ProducerFactory<String, T> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

and then I use the private final KafkaTemplate<String, T> kafkaTemplate; to publish a Java object on a topic.

@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl<T> implements KafkaProducerService<T> {

    private final KafkaTemplate<String, T> kafkaTemplate;

    @Override
    public void send(String topic, Function<T, String> key, T message) throws KafkaException {
        this.kafkaTemplate.send(topic, key.apply(message), message);
    }
}

Am I missing something in the configuration properties?

1

There are 1 best solutions below

2
OneCricketeer On BEST ANSWER

This is expected for that Serializer. Confluent docs

results in an invalid JSON string (e.g. when displayed in Offset-Explorer

Offset Explorer doesn't have JSONSchema support in its Registry integration, last I checked. Spring-Kafka has its own JSONSerializer class that you would want to use instead of anything from Confluent, assuming you want JSON data without using the Schema Registry.

Using the Schema Registry will always send binary encoded data with a starting null byte