Kafka reactor - How to disable KAFKA consumer being autostarted?

Below is my KAFKA consumer

@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
        matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
    ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
    receiverOptions.schedulerSupplier(() -> Schedulers
            .fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
    return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)

My KAFKA consumer is getting started automatically. However I want to disable KAFKA consumer being autostarted.

I got to know that, In spring KAFKA we can do something like this


however, I am not sure how I achieve(control auto start/stop behavior) in Kafka reactor. I want to have something like below

Introducing a property to handle the auto start/stop behavior

private boolean start;

using the above property I should be able to set the KAFKA Auto-Start flag in Kafka reactor, something like this

return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)

Note: .setAutoStart(start);

Is this doable in Kafka reactor, if so, how do I do it?


protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
    Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
            .groupBy(receiverRecord -> {
                try {
                    return receiverRecord.receiverOffset().topicPartition();
                } catch (Throwable throwable) {
                    log.error("exception in groupby", throwable);
                    return Flux.empty();
            }).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
            .map(record -> {
                processMessage(record, topicName, allowedValues).block(
                        Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
                return record;
            }).concatMap(message -> {
                log.info("Received message after processing offset: {} partition: {} ",
                         message.offset(), message.partition());
                return message.receiverOffset()
                        .onErrorContinue((t, o) -> log.error(
                                String.format("exception raised while commit offset %s", o), t)
            })).onErrorContinue((t, o) -> {
        try {
            if (null != o) {
                ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
                ReceiverOffset offset = record.receiverOffset();
                log.debug("failed to process message: {} partition: {} and message: {} ",
                          offset.offset(), record.partition(), record.value());
            log.error(String.format("exception raised while processing message %s", o), t);
        } catch (Throwable inner) {
            log.error("encountered error in onErrorContinue", inner);

Can I do something like this?

kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {

With reactor-kafka there is no concept of "auto start"; you are in complete control.

The consumer is not "started" until you subscribe to the Flux returned from receiver.receive().

Simply delay the flux.subscribe() until you are ready to consume data.