I have some Reactor Kafka code that reads in events via a KafkaReceiver
and writes 1..many downstream messages via 1 or more KafkaSenders
that are concatenated into a single Publisher
. Everything is working great, but what I'd like to do is only emit an event from this concatenated senders Flux
when it is complete (i.e. it's done writing to all downstream topics for any given event, so it does not emit anything for each element as it writes to Kafka downstream until it's done). This way I could sample()
and periodically commit offsets, knowing that whenever it is that sample()
happens to trigger and I commit offsets for an incoming event that I've processed all downstream messages for each event I'm committing offsets for. It seems like I could use either pauseUntilOther()
or then()
somehow, but I don't quite see exactly how given my code and specific use case. Any thoughts or suggestions appreciated, thanks.
Main Publisher code:
this.kafkaReceiver.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(this.scheduler)
.flatMap(this::processEvent)
.sample(Duration.ofMillis(10))
.concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
.subscribe();
Concatenated KafkaSenders returned by call to processEvent()
:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);
Sounds like
Flux.last()
is what you are looking for:Then your
.sample(Duration.ofMillis(10))
would do whatever is available as a last item from one or several batches sent to those brokers. And in the end yourcommitReceiverOffset()
would properly commit whatever was the last.See its JavaDocs for more info:
and marble diagram: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/last.svg