Circuitbreaker object inside a class on a spark executor

49 Views Asked by At

I have a Kafka Structured Streaming Application, and a class with a method that is run on every record inside a mapPartition. This method contains a call to Redis (Lettuce Client) that I want to add the circuitbreaker to.

I've tried using the chrisdavenport typelevel circuit library and resilience4j. To see that the redis call indeed throws an exception, I added a recover block to catch it and it does throw an exception.

But the circuitbreaker does not increment the failures upon the exception.

Using resilience4j -

class MyClass(circuitBreaker: CircuitBreaker, redisT: AtomicReference[Try[RedisAsyncCommands[String, String]]]) {
 def myMethod(lookupKey: String) = {
  redisT.get().map {
   redis =>
    circuitBreaker.executeSupplier(() => {
      redis.get(lookupKey).toScala.map(Option(_))
    })
    .recover{
      // **Recover Block only added to check if the exception is thrown. Removed once it is confirmed that the exception is indeed thrown**
    }
    .map {
      // Logic
    }
  }
 }
}

  • redisT is configured with a command timeout of 1 second using lettuce's timeout options. This exception gets caught when there's a recover block.

  • circuitBreaker (resilience4j)

lazy val sharedRedisResources: ClientResources = {
    logger.info("Creating Shared Redis Resources")
    DefaultClientResources.create()
  }

  lazy val circuitBreaker: CircuitBreaker = circuitBreakerRegistry.circuitBreaker("circuitbreaker name")

Is my use of the circuitbreaker correct? Am I missing something?

0

There are 0 best solutions below