I have a Kafka Structured Streaming Application, and a class with a method that is run on every record inside a mapPartition. This method contains a call to Redis (Lettuce Client) that I want to add the circuitbreaker to.
I've tried using the chrisdavenport typelevel circuit library and resilience4j. To see that the redis call indeed throws an exception, I added a recover block to catch it and it does throw an exception.
But the circuitbreaker does not increment the failures upon the exception.
Using resilience4j -
class MyClass(circuitBreaker: CircuitBreaker, redisT: AtomicReference[Try[RedisAsyncCommands[String, String]]]) {
def myMethod(lookupKey: String) = {
redisT.get().map {
redis =>
circuitBreaker.executeSupplier(() => {
redis.get(lookupKey).toScala.map(Option(_))
})
.recover{
// **Recover Block only added to check if the exception is thrown. Removed once it is confirmed that the exception is indeed thrown**
}
.map {
// Logic
}
}
}
}
redisT is configured with a command timeout of 1 second using lettuce's timeout options. This exception gets caught when there's a recover block.
circuitBreaker (resilience4j)
lazy val sharedRedisResources: ClientResources = {
logger.info("Creating Shared Redis Resources")
DefaultClientResources.create()
}
lazy val circuitBreaker: CircuitBreaker = circuitBreakerRegistry.circuitBreaker("circuitbreaker name")
Is my use of the circuitbreaker correct? Am I missing something?