Unable to invoke custom MockSchemaRegistryClient for Testing using EmbeddedKafka

113 Views Asked by At

I'm trying to write IT for my Kafka consumer using Embedded Kafka and as a part of it I want to use MockSchemaRegistryClient to get the AVRO Schema for test. However, it always ends up calling the default implementation MockSchemaRegistryClient under Kafka Schema Registry client instead of my Custom implementation.

Also, this happens only when I use kafka-avro-serializer: 7.4.0 or above and works fine when I use kafka-avro-serializer: (<=)7.3.5

Any idea where I am going wrong?

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
    public CustomKafkaAvroSerializer() {
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
        super(new MockSchemaRegistryClient());
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
        super(new MockSchemaRegistryClient(), props);
    }
}

public class KafkaAvroKeyDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {
        this.schemaRegistry=getMockClient(topic);
        return super.deserialize(topic, bytes);
    }

    private static SchemaRegistryClient getMockClient(String topic) {
        return new MockSchemaRegistryClient() {
            @Override
            public ParsedSchema getSchemaBySubjectAndId(String subject, int id) {
                return new AvroSchema(SOMEKEY.SCHEMA$);
            }
        };
    }

}

public class KafkaAvroValueDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {
        this.schemaRegistry=getMockClient(topic);
        return super.deserialize(topic, bytes);
    }

    private static SchemaRegistryClient getMockClient(String topic) {
        return new MockSchemaRegistryClient() {
            @Override
            public ParsedSchema getSchemaBySubjectAndId(String subject, int id) {
                return new AvroSchema(SOMEVALUE.SCHEMA$);
            }
        };
    }
}

@EmbeddedKafka()
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
        "spring.main.allow-bean-definition-overriding=true",
        "spring.kafka.properties.auto.register.schemas=true",
})
public class KafkaConsumerIT {
    private static final String EVENT_TOPIC = "event-topic";

    @Autowired
    private EmbeddedKafkaBroker kafkaEmbedded;
    protected Producer<SOMEKEY, SOMEVALUE> eventProducer;

    protected Consumer<SOMEKEY, SOMEVALUE> eventConsumer;

    @BeforeEach
    void setUp() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded.getBrokersAsString());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomKafkaAvroSerializer.class);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CustomKafkaAvroSerializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "in-test-consumer");
        props.put("schema.registry.url", "mock://testurl");

        ProducerFactory<SOMEKEY, SOMEVALUE> producerFactory = new DefaultKafkaProducerFactory<>(props);
        eventProducer = producerFactory.createProducer();

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("in-test-consumer", "false", kafkaEmbedded));
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroKeyDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroValueDeserializer.class);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        configs.put("schema.registry.url", "mock://testurl");

        eventConsumer = new DefaultKafkaConsumerFactory<SOMEKEY, SOMEVALUE>(configs).createConsumer("in-test-consumer", "10");

        kafkaProperties.buildConsumerProperties();
        eventConsumer.subscribe(Lists.newArrayList(EVENT_TOPIC));
    }

    @AfterEach
    void cleanUp () {
        eventConsumer.close();
    }

    @Test
    public void test_receive_event() {
        eventProducer.send(new ProducerRecord<>(EVENT_TOPIC, new SomeKey(), new SomeValue());

        ConsumerRecord<SOMEKEY, SOMEVALUE> consumerRecord = KafkaTestUtils.getSingleRecord(eventConsumer, EVENT_TOPIC);
        assertThat(consumerRecord).isNotNull();
    }


}
0

There are 0 best solutions below