Below is a sample code which uses reactor-kafka and reads data from a topic (with retry logic) which has records published via a non-reactive producer. Inside my doOnNext()
consumer I am using non-reactive elasticsearch client which indexes the record in the index. So I have few questions that I am still unclear about :
- I know that consumers and producers are independent decoupled systems, but is it recommended to have reactive producer as well whose consumers are reactive?
- If I am using something that is non-reactive, in this case Elasticsearch client
org.elasticsearch.client.RestClient
, does the "reactiveness" of the code work? If it does or does not, how do I test it? (By "reactiveness", I mean non blocking IO part of it i.e. if I spawn three reactive-consumers and one is latent for some reason, the thread should be unblocked and used for other reactive consumer). - In general the question is, if I wrap some API with reactive clients should the API be reactive as well?
public Disposable consumeRecords() {
long maxAttempts = 3, duration = 10;
RetryBackoffSpec retrySpec = Retry.backoff(maxAttempts, Duration.ofSeconds(duration)).transientErrors(true);
Consumer<ReceiverRecord<K, V>> doOnNextConsumer = x -> {
// use non-reactive elastic search client and index record x
};
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
try {
// calling the non-reactive consumer
doOnNextConsumer.accept(record);
} catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
record.receiverOffset().acknowledge();
})
.doOnError(t -> log.error("Error occurred: ", t))
.retryWhen(retrySpec)
.onErrorContinue((e, record) -> {
ReceiverRecordException receiverRecordException = (ReceiverRecordException) e;
log.error("Retries exhausted for: " + receiverRecordException);
receiverRecordException.getRecord().receiverOffset().acknowledge();
})
.repeat()
.subscribe();
}
Got some understanding around it.
Reactive KafkaReceiver will internally call some API; if that API is blocking API then even if KafkaReceiver is "reactive" the non-blocking IO will not work and the receiver thread will be blocked because you are calling Blocking API / non-reactive API.
You can test this out by creating a simple server (which blocks calls for sometime / sleep) and calling that server from this receiver