Weblux reactor kafka consumer error handling with manual acknowledgement

19 Views Asked by At

In my kafka consumer, if a transient error happens I want the consumer to have a circuit breaker for which I am using retry indefintely.

So, that I can fix the problem and start consuming again. But the problem which is happening is I am consuming events in a batch of 500. And since the processing is reactive, an event with higher offset gets committed before the event with lower offset throws the error.

So, when I start consuming again after the error, it starts from the latest offset and not the one where error happened earlier. Causing me loss of events.

My code :

    @Slf4j
public class EventConsumer {
    // Class details omitted
    public Disposable consumeMessage() {
        return processRecord().onBackpressureBuffer(BUFFER_SIZE)
                              .limitRate(500)
                              .retryWhen(Retry.indefinitely())
                              .subscribe(record -> {}, error -> log.error("Error while consuming event: {}", error.getMessage()));
    }

public Flux<EventWrapper> processRecord() {
    Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(inputEventReceiver::receive);
    return receiverRecord.flatMap(this::processMessage);
}

// Additional methods omitted
}

And I could not find a solution for this problem as flux is a stream and events keep on coming, which makes it difficult for synchronous commit with asynchronous processing.

0

There are 0 best solutions below