I have a use case where I want to infinitely keep receiving records from Kafka and do some processing on the record using processRecord(String record)
which can throw a RuntimeException
. I want to retry multiple times (say 5) and if it is successful anytime before 5 retries want to commit the offset manually and continue with next records and if it is not then want to (log it --> commit offset) then continue with the next records. I have a code, but doesn't seem to work appropriately. Would appreciate some help.
public class MyClass {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final ReceiverOptions<String, String> receiverOptions = getReceiverOptions();
public void consumeRecords() {
RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
System.out.println(record.value());
processRecord(record.value());
})
.doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
.onErrorContinue((e, r) -> {
System.out.println(atomicInteger.incrementAndGet());
System.out.println("Record: " + r);
System.out.println("Error: " + e);
})
.retryWhen(retrySpec)
.repeat()
.subscribe();
}
public void processRecord(String record) {
// might throw an exception
throw new RuntimeException("Throwing exception!");
}
}
The output that I receive is :
some message
1
Record: ConsumerRecord(topic = my-topic, partition = 0, leaderEpoch = null, offset = 1, CreateTime = 1620062099518, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = some message)
Error: java.lang.RuntimeException: Throwing exception!
second message
1
Record: ConsumerRecord(topic = my-topic, partition = 1, leaderEpoch = null, offset = 2, CreateTime = 1620062166706, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = second message)
Error: java.lang.RuntimeException: Throwing exception!
It is not retrying 5 times and moreover the AtomicInteger is not getting updated for the second record.
What I want to achieve is :
count = 0
while (count < 5) {
if (exception) count++;
else break_and_continue_with_next_record
}
if (count == 5) log_failure_and_continue_with_next_record
onErrorResume()
is preferred overonErrorContinue()
.The problem then is you can't commit the offset there, because the receiver is no longer active at that point.
This works for me...
EDIT
Here is the complete app...
Result: