How I deserialize Avro from Kafka with spring boot 2.7.18

31 Views Asked by At

I am trying deserialize an AVRO data to an object. But I get en error when deserializing with KafkaDeserializer or with custom deserializer.

Custom deserializer:


    @Log4j2
    @RequiredArgsConstructor
    public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
    
        protected final Class<T> targetType;
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            // No-op
        }
    
        @Override
        public Cbir deserialize(String topic, byte[] data) {
            try {
                T result = null;
    
                if (data != null) {
                    log.debug("data='{}'", DatatypeConverter.printHexBinary(data));
    
                    DatumReader<T> datumReader =
                            new SpecificDatumReader<>(targetType.getDeclaredConstructor().newInstance().getSchema());
                    Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
    
                    result = datumReader.read(null, decoder);
                    log.debug("deserialized data='{}'", result);
                }
                return result;
            } catch (Exception ex) {
                throw new SerializationException(
                        "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
            }
        }
    
        @Override
        public void close() {
            // No-op
        }
    }

Consumer configuration:


    import com.kafka.dto.MyGeneratedAvroWithMavenPlugin;
    import com.utils.AvroDeserializer;
    import com.utils.CryptoUtils;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.log4j.Log4j2;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Objects;
    
    
    @Configuration
    @EnableKafka
    @RequiredArgsConstructor
    public class KafkaConsumerConfiguration {
    
        private final KafkaProperties kafkaProperties;
    
    
        @Bean
        public Map<String, Object> consumerConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getAutoOffsetReset());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.isEnableAutoCommit());
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, kafkaProperties.getIsolationLevelConfig());
            props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaProperties.getHeartbeatIntervalMsConfig());
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaProperties.getSessionTimeoutMsConfig());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000");
            props.put("schema.registry.url", kafkaProperties.getSchemaRegistryUrl());
            props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
            if (Objects.nonNull(kafkaProperties) && Objects.nonNull(kafkaProperties.getSsl()) && kafkaProperties.getSsl().isEnabled()) {
                props.put("security.protocol", kafkaProperties.getSecurity().getProtocol());
                props.put("ssl.truststore.location", kafkaProperties.getSsl().getTrustStoreLocation());
                props.put("ssl.truststore.password", kafkaProperties.getSsl().getTrustStorePassword(), kafkaProperties.getKey());
                props.put("ssl.keystore.location", kafkaProperties.getSsl().getKeyStoreLocation());
                props.put("ssl.keystore.password",  kafkaProperties.getSsl().getKeyStorePassword(), kafkaProperties.getKey());
                props.put("ssl.key.password",  kafkaProperties.getSsl().getKeyPassword(), kafkaProperties.getKey());
            }
            return props;
        }
    
        @Bean
        public ConsumerFactory<String, MyGeneratedAvroWithMavenPlugin> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfig(), new StringDeserializer(), new AvroDeserializer<>(MyGeneratedAvroWithMavenPlugin.class));
        }
    
    
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyGeneratedAvroWithMavenPlugin>> kafkaListenerContainerFactory(
                ConsumerFactory<String, MyGeneratedAvroWithMavenPlugin> consumerFactory
        ) {
            ConcurrentKafkaListenerContainerFactory<String, MyGeneratedAvroWithMavenPlugin> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            factory.setConcurrency(kafkaProperties.getConcurrency());
            return factory;
        }
    
    
    }

Kafka listener


    import com.kafka.dto.MyGeneratedAvroWithMavenPlugin;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.log4j.Log4j2;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.context.event.EventListener;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.event.ListenerContainerIdleEvent;
    import org.springframework.kafka.support.KafkaHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Service;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.List;
    
    @Log4j2
    @Service
    @RequiredArgsConstructor
    public class KafkaConsumer {
        private final List<MyGeneratedAvroWithMavenPlugin> messages = new ArrayList<>();
    
    
        @KafkaListener(topicPattern = "${kafka.topic}", containerFactory = "kafkaListenerContainerFactory")
        public void receive(ConsumerRecord<String, MyGeneratedAvroWithMavenPlugin> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String groupID) {
           log.info("Received payload key: '{}'", consumerRecord.key());
           log.info("Received payload value: '{}'", consumerRecord.value().toString());
           messages.add(consumerRecord.value());
        }
    }

Impossible to deserialize avro content using custom deserializer AvroDeserializer or KafkaDeserializer. For the custom one, it is failing on


    Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
    
                    result = datumReader.read(null, decoder);

I am getting when deserialize : java.lang.ArrayIndexOutOfBoundsException: Index -19 out of bounds for length 2 I need to understand why it is failing. Thanks

0

There are 0 best solutions below