Exception in Alpakka JsonReader substream not being caught by supervisor

178 Views Asked by At

I have a source which uses a flatMapConcat to consolidate a new stream of elements, produced by parsing Json. To do this, I am using JsonReader provided by Alpakka (https://developer.lightbend.com/docs/alpakka/current/data-transformations/json.html). This works well, but I would like to handle the case where invalid Json is presented to the reader. In this case I would like the exception to bubble up to the supervisor so it can restart the stream.

Here's a quick snippet showcasing the bad json case:

val systemErrorHandler: Supervision.Decider = {
  case e: Exception =>
    println("System Error")
    Supervision.Restart
  case _ => Supervision.Stop
}

val routeErrorHandler: Supervision.Decider = { 
  case e: Exception =>
    println("Route Error")
    Supervision.Restart
}

val exceptionStrategy = ActorAttributes.supervisionStrategy(routeErrorHandler)

implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(systemErrorHandler))


val json = """{"test":"one}"""

val route = Source.single("1").flatMapConcat { _ =>
  Source.single(ByteString(json))
    .via(JsonReader.select("$")).map(_.utf8String)
}.to(Sink.foreach(println)).withAttributes(exceptionStrategy)

route.run

I would expect JsonReader, upon throwing an exception, to fail the stage and then bubble the exception up to the routeErrorHandler. However, when run the JsonReader stage is failed and the exception is swallowed. I also attempted to place a recover in the stream to throw the exception at a 'higher' level (outside the substream) but that also doesn't get handled with either the routeErrorHandler supervisor or the supervisor placed on the ActorMaterializer.

Is this expected behavior? If so, what is the proper way to ensure an exception within a substream reaches the supervisor strategy?

0

There are 0 best solutions below