How to create separate Kafka listener for each topic dynamically in springboot?

9.6k Views Asked by At

I am new to Spring and Kafka. I am working on a use case [using SpringBoot-kafka] where in users are allowed to create kafka topics at runtime. The spring application is expected to subscribe to these topics pro-grammatically at runtime. What i know so far is that, Kafka listener are design time and hence topics needs to be specified before startup. Is there a way to dynamically subscribe to kafka topics in SpringBoot-Kafka integration?

Referred this https://github.com/spring-projects/spring-kafka/issues/132

Current approach that i am planning to implement is, do not use Spring-Kafka integration instead implement Kafka consumer myself [using java code] as mentioned here spring boot kafka consumer - how to properly consume kafka messages from spring boot

1

There are 1 best solutions below

1
On

Kafka listeners are only "design time" if you want to specify them using annotations. Spring-kafka allows you to create them dynamically as well, see KafkaMessageListenerContainer.

The simplest example of Kafka listener created on the fly would be:

Map<String, Object> consumerConfig = ImmutableMap.of(
    BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
    GROUP_ID_CONFIG, "groupId"
);

DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
        new DefaultKafkaConsumerFactory<>(
                consumerConfig,
                new StringDeserializer(),
                new StringDeserializer());

ContainerProperties containerProperties = new ContainerProperties("topicName");
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
     //do something with received record
} 

ConcurrentMessageListenerContainer container =
        new ConcurrentMessageListenerContainer<>(
                kafkaConsumerFactory,
                containerProperties);

container.start();

For more explanation and code see this blog post: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/