How to create a Bulk consumer in Spring webflux and Kafka

487 Views Asked by At

I need to poll kafka and process events in bulk. In Reactor kafka, since its a steaming API, I am getting events as stream. Is there a way to combine and get a fixed max size of events.

This is what I doing currently.

final Flux<Flux<ConsumerRecord<String, String>>> receive = KafkaReceiver.create(eventReceiverOptions)
    .receiveAutoAck();
receive
    .concatMap(r -> r)
    .doOnEach(listSignal -> log.info("got one message"))
    .map(consumerRecords -> consumerRecords.value())
    .collectList()
    .flatMap(strings -> {
      log.info("Read messages of size {}", strings.size());
      return processBulkMessage(strings)
          .doOnSuccess(aBoolean -> log.info("Processed records"))
          .thenReturn(strings);
    }).subscribe();

But code just hangs after collectList and never goes to the last flatMap.

Thanks In advance.

1

There are 1 best solutions below

0
On

You just do a "flattening" with your plain .concatMap(r -> r) therefore you fully eliminate what is there is a batching originally built by that receiveAutoAck(). To have a stream of lists for your processBulkMessage() to process consider to move all the batch logic into that concatMap():

.concatMap(batch -> batch
                    .doOnEach(listSignal -> log.info("got one message"))
                    .map(ConsumerRecord::value)
                    .collectList())
            .flatMap(strings -> {