I'm trying to write IT for my Kafka consumer using Embedded Kafka and as a part of it I want to use MockSchemaRegistryClient to get the AVRO Schema for test. However, it always ends up calling the default implementation MockSchemaRegistryClient under Kafka Schema Registry client instead of my Custom implementation.
Also, this happens only when I use kafka-avro-serializer: 7.4.0 or above and works fine when I use kafka-avro-serializer: (<=)7.3.5
Any idea where I am going wrong?
public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
public CustomKafkaAvroSerializer() {
super();
super.schemaRegistry = new MockSchemaRegistryClient();
}
public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
super(new MockSchemaRegistryClient());
}
public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
super(new MockSchemaRegistryClient(), props);
}
}
public class KafkaAvroKeyDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry=getMockClient(topic);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(String topic) {
return new MockSchemaRegistryClient() {
@Override
public ParsedSchema getSchemaBySubjectAndId(String subject, int id) {
return new AvroSchema(SOMEKEY.SCHEMA$);
}
};
}
}
public class KafkaAvroValueDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry=getMockClient(topic);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(String topic) {
return new MockSchemaRegistryClient() {
@Override
public ParsedSchema getSchemaBySubjectAndId(String subject, int id) {
return new AvroSchema(SOMEVALUE.SCHEMA$);
}
};
}
}
@EmbeddedKafka()
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
"spring.main.allow-bean-definition-overriding=true",
"spring.kafka.properties.auto.register.schemas=true",
})
public class KafkaConsumerIT {
private static final String EVENT_TOPIC = "event-topic";
@Autowired
private EmbeddedKafkaBroker kafkaEmbedded;
protected Producer<SOMEKEY, SOMEVALUE> eventProducer;
protected Consumer<SOMEKEY, SOMEVALUE> eventConsumer;
@BeforeEach
void setUp() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded.getBrokersAsString());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomKafkaAvroSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CustomKafkaAvroSerializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "in-test-consumer");
props.put("schema.registry.url", "mock://testurl");
ProducerFactory<SOMEKEY, SOMEVALUE> producerFactory = new DefaultKafkaProducerFactory<>(props);
eventProducer = producerFactory.createProducer();
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("in-test-consumer", "false", kafkaEmbedded));
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroKeyDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroValueDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configs.put("schema.registry.url", "mock://testurl");
eventConsumer = new DefaultKafkaConsumerFactory<SOMEKEY, SOMEVALUE>(configs).createConsumer("in-test-consumer", "10");
kafkaProperties.buildConsumerProperties();
eventConsumer.subscribe(Lists.newArrayList(EVENT_TOPIC));
}
@AfterEach
void cleanUp () {
eventConsumer.close();
}
@Test
public void test_receive_event() {
eventProducer.send(new ProducerRecord<>(EVENT_TOPIC, new SomeKey(), new SomeValue());
ConsumerRecord<SOMEKEY, SOMEVALUE> consumerRecord = KafkaTestUtils.getSingleRecord(eventConsumer, EVENT_TOPIC);
assertThat(consumerRecord).isNotNull();
}
}