RetryBackoffSpec not working with KafkaReceiver which throws exception

778 Views Asked by At

I have a use case where I want to infinitely keep receiving records from Kafka and do some processing on the record using processRecord(String record) which can throw a RuntimeException. I want to retry multiple times (say 5) and if it is successful anytime before 5 retries want to commit the offset manually and continue with next records and if it is not then want to (log it --> commit offset) then continue with the next records. I have a code, but doesn't seem to work appropriately. Would appreciate some help.

public class MyClass {
    private final AtomicInteger atomicInteger = new AtomicInteger(0);
    private final ReceiverOptions<String, String> receiverOptions = getReceiverOptions();

    public void consumeRecords() {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext(record -> {
                    System.out.println(record.value());
                    processRecord(record.value());
                })
                .doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
                .onErrorContinue((e, r) -> {
                    System.out.println(atomicInteger.incrementAndGet());
                    System.out.println("Record: " + r);
                    System.out.println("Error: " + e);
                })
                .retryWhen(retrySpec)
                .repeat()
                .subscribe();

    }

    public void processRecord(String record) {
        // might throw an exception
        throw new RuntimeException("Throwing exception!");
    }
}

The output that I receive is :

some message
1
Record: ConsumerRecord(topic = my-topic, partition = 0, leaderEpoch = null, offset = 1, CreateTime = 1620062099518, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = some message)
Error: java.lang.RuntimeException: Throwing exception!

second message
1
Record: ConsumerRecord(topic = my-topic, partition = 1, leaderEpoch = null, offset = 2, CreateTime = 1620062166706, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = second message)
Error: java.lang.RuntimeException: Throwing exception!

It is not retrying 5 times and moreover the AtomicInteger is not getting updated for the second record.

What I want to achieve is :

count = 0
while (count < 5) {
    if (exception) count++;
    else break_and_continue_with_next_record
}

if (count == 5) log_failure_and_continue_with_next_record
1

There are 1 best solutions below

8
On BEST ANSWER

onErrorResume() is preferred over onErrorContinue().

The problem then is you can't commit the offset there, because the receiver is no longer active at that point.

This works for me...

    private final AtomicInteger atomicInteger = new AtomicInteger();

    public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
        AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
        receiver.receive()
                .subscribeOn(Schedulers.single())
                .doOnNext(record -> {
                    System.out.println(record.value() + "@" + record.offset());
                    if (failed.get() != null) {
                        System.out.println("Committing failed record offset " + record.value()
                                + "@" + record.offset());
                        record.receiverOffset().acknowledge();
                        failed.set(null);
                    }
                    else {
                        atomicInteger.set(0);
                        try {
                            processRecord(record.value());
                            record.receiverOffset().acknowledge();
                        }
                        catch (Exception e) {
                            throw new ReceiverRecordException(record, e);
                        }
                    }
                })
                .doOnError(ex -> atomicInteger.incrementAndGet())
                .retryWhen(retrySpec)
                .onErrorResume(e -> {
                    ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                    ReceiverRecord<?, ?> record = ex.getRecord();
                    System.out.println("Retries exhausted for " + record.value()
                            + "@" + record.offset());
                    failed.set(record);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }

    public void processRecord(String record) {
        // might throw an exception
        throw new RuntimeException("Throwing exception!");
    }

}

@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {

    private final ReceiverRecord record;

    ReceiverRecordException(ReceiverRecord record, Throwable t) {
        super(t);
        this.record = record;
    }

    public ReceiverRecord getRecord() {
        return this.record;
    }

}

EDIT

Here is the complete app...

@SpringBootApplication
public class So67373188Application {

    private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(So67373188Application.class, args);
        Thread.sleep(120_000);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner2() {
        return args -> {
            SenderOptions<String, String> so = SenderOptions.create(
                    Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
            KafkaSender<String, String> sender = KafkaSender.create(so);
            Disposable subscribed = sender.send(Flux.just(pr("foo"), pr("bar"), pr("fail"), pr("baz")))
                .subscribe(result -> {
                    System.out.println(result.recordMetadata());
                });
            Thread.sleep(5000);
            subscribed.dispose();
        };
    }

    @Bean
    public ApplicationRunner runner3(KafkaOperations<String, String> template) {
        return args -> {
            DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
            ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                    Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                            ConsumerConfig.GROUP_ID_CONFIG, "so67373188",
                            ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                    .withKeyDeserializer(new StringDeserializer())
                    .withValueDeserializer(new StringDeserializer())
                    .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                    .commitBatchSize(1)
                    .subscription(Collections.singletonList("so67373188"));
            consumeRecords(ro);
        };
    }

    private SenderRecord<String, String, String> pr(String value) {
        return SenderRecord.create("so67373188", 0, null, null, value, value + ".corr");
    }

    private final AtomicInteger atomicInteger = new AtomicInteger();

    public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
        AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
        receiver.receive()
                .subscribeOn(Schedulers.single())
                .doOnNext(record -> {
                    System.out.println(record.value() + "@" + record.offset());
                    if (failed.get() != null) {
                        System.out.println("Committing failed record offset " + record.value()
                                + "@" + record.offset());
                        record.receiverOffset().acknowledge();
                        failed.set(null);
                    }
                    else {
                        atomicInteger.set(0);
                        try {
                            processRecord(record.value());
                            record.receiverOffset().acknowledge();
                        }
                        catch (Exception e) {
                            throw new ReceiverRecordException(record, e);
                        }
                    }
                })
                .doOnError(ex -> atomicInteger.incrementAndGet())
                .retryWhen(retrySpec)
                .onErrorResume(e -> {
                    ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                    ReceiverRecord<?, ?> record = ex.getRecord();
                    System.out.println("Retries exhausted for " + record.value()
                            + "@" + record.offset());
                    failed.set(record);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }

    public void processRecord(String record) {
        // might throw an exception
        if (record.equals("fail")) {
            throw new RuntimeException("Throwing exception!");
        }
    }

}

@SuppressWarnings("serial")
class ReceiverRecordException extends RuntimeException {

    private final ReceiverRecord record;

    ReceiverRecordException(ReceiverRecord record, Throwable t) {
        super(t);
        this.record = record;
    }

    public ReceiverRecord getRecord() {
        return this.record;
    }

}

Result:


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
[32m :: Spring Boot :: [39m              [2m (v2.4.5)[0;39m

so67373188-0@16
so67373188-0@17
so67373188-0@18
so67373188-0@19
foo@16
bar@17
fail@18
fail@18
fail@18
fail@18
fail@18
fail@18
Retries exhausted for fail@18
fail@18
Committing failed record offset fail@18
baz@19